kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-4602; KIP-72 - Allow putting a bound on memory consumed by Incoming requests
Date Wed, 26 Jul 2017 06:20:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f15cdbc91 -> 47ee8e954


http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index cc061da..cc6a394 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -22,13 +22,22 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.ByteBuffer;
 
+import java.util.Random;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.MockTime;
@@ -50,7 +59,7 @@ public class SelectorTest {
     protected Time time;
     protected Selector selector;
     protected ChannelBuilder channelBuilder;
-    private Metrics metrics;
+    protected Metrics metrics;
 
     @Before
     public void setUp() throws Exception {
@@ -322,6 +331,87 @@ public class SelectorTest {
         assertTrue("Unexpected receive", selector.completedReceives().isEmpty());
     }
 
+    @Test
+    public void testMuteOnOOM() throws Exception {
+        //clean up default selector, replace it with one that uses a finite mem pool
+        selector.close();
+        MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
+        selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
+            new HashMap<String, String>(), true, false, channelBuilder, pool);
+
+        try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+            ss.bind(new InetSocketAddress(0));
+
+            InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
+
+            Thread sender1 = createSender(serverAddress, randomPayload(900));
+            Thread sender2 = createSender(serverAddress, randomPayload(900));
+            sender1.start();
+            sender2.start();
+
+            //wait until everything has been flushed out to network (assuming payload size
is smaller than OS buffer size)
+            //this is important because we assume both requests' prefixes (1st 4 bytes) have
made it.
+            sender1.join(5000);
+            sender2.join(5000);
+
+            SocketChannel channelX = ss.accept(); //not defined if its 1 or 2
+            channelX.configureBlocking(false);
+            SocketChannel channelY = ss.accept();
+            channelY.configureBlocking(false);
+            selector.register("clientX", channelX);
+            selector.register("clientY", channelY);
+
+            List<NetworkReceive> completed = Collections.emptyList();
+            long deadline = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() < deadline && completed.isEmpty())
{
+                selector.poll(1000);
+                completed = selector.completedReceives();
+            }
+            assertEquals("could not read a single request within timeout", 1, completed.size());
+            NetworkReceive firstReceive = completed.get(0);
+            assertEquals(0, pool.availableMemory());
+            assertTrue(selector.isOutOfMemory());
+
+            selector.poll(10);
+            assertTrue(selector.completedReceives().isEmpty());
+            assertEquals(0, pool.availableMemory());
+            assertTrue(selector.isOutOfMemory());
+
+            firstReceive.close();
+            assertEquals(900, pool.availableMemory()); //memory has been released back to
pool
+
+            completed = Collections.emptyList();
+            deadline = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() < deadline && completed.isEmpty())
{
+                selector.poll(1000);
+                completed = selector.completedReceives();
+            }
+            assertEquals("could not read a single request within timeout", 1, selector.completedReceives().size());
+            assertEquals(0, pool.availableMemory());
+            assertFalse(selector.isOutOfMemory());
+        }
+    }
+
+    private Thread createSender(InetSocketAddress serverAddress, byte[] payload) {
+        return new PlaintextSender(serverAddress, payload);
+    }
+
+    protected byte[] randomPayload(int sizeBytes) throws Exception {
+        Random random = new Random();
+        byte[] payload = new byte[sizeBytes + 4];
+        random.nextBytes(payload);
+        ByteArrayOutputStream prefixOs = new ByteArrayOutputStream();
+        DataOutputStream prefixDos = new DataOutputStream(prefixOs);
+        prefixDos.writeInt(sizeBytes);
+        prefixDos.flush();
+        prefixDos.close();
+        prefixOs.flush();
+        prefixOs.close();
+        byte[] prefix = prefixOs.toByteArray();
+        System.arraycopy(prefix, 0, payload, 0, prefix.length);
+        return payload;
+    }
+
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
         selector.poll(1000L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index e272855..46d3b79 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -17,17 +17,23 @@
 package org.apache.kafka.common.network;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.protocol.SecurityProtocol;
@@ -35,6 +41,7 @@ import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestSslUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,7 +50,6 @@ import org.junit.Test;
  */
 public class SslSelectorTest extends SelectorTest {
 
-    private Metrics metrics;
     private Map<String, Object> sslClientConfigs;
 
     @Before
@@ -160,6 +166,90 @@ public class SslSelectorTest extends SelectorTest {
 
     }
 
+    @Override
+    public void testMuteOnOOM() throws Exception {
+        //clean up default selector, replace it with one that uses a finite mem pool
+        selector.close();
+        MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
+        //the initial channel builder is for clients, we need a server one
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false,
true, Mode.SERVER, trustStoreFile, "server");
+        sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        channelBuilder = new SslChannelBuilder(Mode.SERVER);
+        channelBuilder.configure(sslServerConfigs);
+        selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",

+                new HashMap<String, String>(), true, false, channelBuilder, pool);
+
+        try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+            ss.bind(new InetSocketAddress(0));
+
+            InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
+
+            SslSender sender1 = createSender(serverAddress, randomPayload(900));
+            SslSender sender2 = createSender(serverAddress, randomPayload(900));
+            sender1.start();
+            sender2.start();
+
+            SocketChannel channelX = ss.accept(); //not defined if its 1 or 2
+            channelX.configureBlocking(false);
+            SocketChannel channelY = ss.accept();
+            channelY.configureBlocking(false);
+            selector.register("clientX", channelX);
+            selector.register("clientY", channelY);
+
+            boolean success = false;
+            NetworkReceive firstReceive = null;
+            long deadline = System.currentTimeMillis() + 5000;
+            //keep calling poll until:
+            //1. both senders have completed the handshakes (so server selector has tried
reading both payloads)
+            //2. a single payload is actually read out completely (the other is too big to
fit)
+            while (System.currentTimeMillis() < deadline) {
+                selector.poll(10);
+
+                List<NetworkReceive> completed = selector.completedReceives();
+                if (firstReceive == null) {
+                    if (!completed.isEmpty()) {
+                        assertEquals("expecting a single request", 1, completed.size());
+                        firstReceive = completed.get(0);
+                        assertTrue(selector.isMadeReadProgressLastPoll());
+                        assertEquals(0, pool.availableMemory());
+                    }
+                } else {
+                    assertTrue("only expecting single request", completed.isEmpty());
+                }
+
+                boolean handshaked = sender1.waitForHandshake(1);
+                handshaked = handshaked && sender2.waitForHandshake(1);
+
+                if (handshaked && firstReceive != null) {
+                    success = true;
+                    break;
+                }
+            }
+            if (!success) {
+                Assert.fail("could not initiate connections within timeout");
+            }
+
+            selector.poll(10);
+            assertTrue(selector.completedReceives().isEmpty());
+            assertEquals(0, pool.availableMemory());
+            assertTrue(selector.isOutOfMemory());
+
+            firstReceive.close();
+            assertEquals(900, pool.availableMemory()); //memory has been released back to
pool
+
+            List<NetworkReceive> completed = Collections.emptyList();
+            deadline = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() < deadline && completed.isEmpty())
{
+                selector.poll(1000);
+                completed = selector.completedReceives();
+            }
+            assertEquals("could not read remaining request within timeout", 1, completed.size());
+            assertEquals(0, pool.availableMemory());
+            assertFalse(selector.isOutOfMemory());
+        }
+    }
+
     /**
      * Connects and waits for handshake to complete. This is required since SslTransportLayer
      * implementation requires the channel to be ready before send is invoked (unlike plaintext
@@ -169,4 +259,7 @@ public class SslSelectorTest extends SelectorTest {
         blockingConnect(node, serverAddr);
     }
 
+    private SslSender createSender(InetSocketAddress serverAddress, byte[] payload) {
+        return new SslSender(serverAddress, payload);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
new file mode 100644
index 0000000..cae69cb
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SslSender extends Thread {
+
+    private final InetSocketAddress serverAddress;
+    private final byte[] payload;
+    private final CountDownLatch handshaked = new CountDownLatch(1);
+
+    public SslSender(InetSocketAddress serverAddress, byte[] payload) {
+        this.serverAddress = serverAddress;
+        this.payload = payload;
+        setDaemon(true);
+        setName("SslSender - " + payload.length + " bytes @ " + serverAddress);
+    }
+
+    @Override
+    public void run() {
+        try {
+            SSLContext sc = SSLContext.getInstance("TLSv1.2");
+            sc.init(null, new TrustManager[]{new NaiveTrustManager()}, new java.security.SecureRandom());
+            try (SSLSocket connection = (SSLSocket) sc.getSocketFactory().createSocket(serverAddress.getAddress(),
serverAddress.getPort())) {
+                OutputStream os = connection.getOutputStream();
+                connection.startHandshake();
+                handshaked.countDown();
+                os.write(payload);
+                os.flush();
+            }
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        }
+    }
+
+    public boolean waitForHandshake(long timeoutMillis) throws InterruptedException {
+        return handshaked.await(timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * blindly trust any certificate presented to it
+     */
+    private static class NaiveTrustManager implements X509TrustManager {
+        @Override
+        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws
CertificateException {
+            //nop
+        }
+
+        @Override
+        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws
CertificateException {
+            //nop
+        }
+
+        @Override
+        public X509Certificate[] getAcceptedIssuers() {
+            return new X509Certificate[0];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index bb5d2a7..8338ad7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -39,6 +39,7 @@ import javax.net.ssl.SSLParameters;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
@@ -580,7 +581,7 @@ public class SslTransportLayerTest {
     public void testNetworkThreadTimeRecorded() throws Exception {
         selector.close();
         this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM,
-                "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder);
+                "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder,
MemoryPool.NONE);
 
         String node = "0";
         server = createEchoServer(SecurityProtocol.SSL);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
new file mode 100644
index 0000000..2b2cc91
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ProtoUtilsTest {
+    @Test
+    public void testDelayedAllocationSchemaDetection() throws Exception {
+        //verifies that schemas known to retain a reference to the underlying byte buffer
are correctly detected.
+        for (ApiKeys key : ApiKeys.values()) {
+            if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP)
{
+                Assert.assertTrue(Protocol.requiresDelayedDeallocation(key.id));
+            } else {
+                Assert.assertFalse(Protocol.requiresDelayedDeallocation(key.id));
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2d6d05c..3feeff2 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.Random;
 
 import static org.apache.kafka.common.utils.Utils.formatAddress;
+import static org.apache.kafka.common.utils.Utils.formatBytes;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 import static org.junit.Assert.assertArrayEquals;
@@ -78,6 +79,17 @@ public class UtilsTest {
     }
 
     @Test
+    public void testFormatBytes() {
+        assertEquals("-1", formatBytes(-1));
+        assertEquals("1023 B", formatBytes(1023));
+        assertEquals("1 KB", formatBytes(1024));
+        assertEquals("1024 KB", formatBytes((1024 * 1024) - 1));
+        assertEquals("1 MB", formatBytes(1024 * 1024));
+        assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024)));
+        assertEquals("10 MB", formatBytes(10 * 1024 * 1024));
+    }
+
+    @Test
     public void testJoin() {
         assertEquals("", Utils.join(Collections.emptyList(), ","));
         assertEquals("1", Utils.join(Arrays.asList("1"), ","));

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index bd71340..6b8dbaa 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -29,9 +29,10 @@ import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
-import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -41,7 +42,7 @@ import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
   val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS,
InetAddress.getLocalHost),
-    buffer = shutdownReceive, startTimeNanos = 0, listenerName = new ListenerName(""),
+    buffer = shutdownReceive, memoryPool = MemoryPool.NONE, startTimeNanos = 0, listenerName
= new ListenerName(""),
     securityProtocol = SecurityProtocol.PLAINTEXT)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -56,10 +57,12 @@ object RequestChannel extends Logging {
     val sanitizedUser = QuotaId.sanitize(principal.getName)
   }
 
-  case class Request(processor: Int, connectionId: String, session: Session, private var
buffer: ByteBuffer,
-                     startTimeNanos: Long, listenerName: ListenerName, securityProtocol:
SecurityProtocol) {
+  case class Request(processor: Int, connectionId: String, session: Session, buffer: ByteBuffer,
+                     private val memoryPool: MemoryPool, startTimeNanos: Long, listenerName:
ListenerName, 
+                     securityProtocol: SecurityProtocol) {
     // These need to be volatile because the readers are in the network thread and the writers
are in the request
     // handler threads or the purgatory threads
+    @volatile var bufferReference = buffer
     @volatile var requestDequeueTimeNanos = -1L
     @volatile var apiLocalCompleteTimeNanos = -1L
     @volatile var responseCompleteTimeNanos = -1L
@@ -104,7 +107,12 @@ object RequestChannel extends Logging {
       else
         null
 
-    buffer = null
+    //most request types are parsed entirely into objects at this point. for those we can
release the underlying buffer.
+    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES)
retain a reference
+    //to the buffer. for those requests we cannot release the buffer early, but only when
request processing is done.
+    if (!Protocol.requiresDelayedDeallocation(requestId)) {
+      dispose()
+    }
 
     def requestDesc(details: Boolean): String = {
       if (requestObj != null)
@@ -194,6 +202,13 @@ object RequestChannel extends Logging {
           .format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs,
apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, responseSendTimeMs,
securityProtocol, session.principal, listenerName.value))
       }
     }
+
+    def dispose(): Unit = {
+      if (bufferReference != null) {
+        memoryPool.release(bufferReference)
+        bufferReference = null
+      }
+    }
   }
 
   object Response {

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 2ba5553..e541015 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -32,7 +32,9 @@ import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable,
Send, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
@@ -61,6 +63,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time:
Time
 
   this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
 
+  private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
+  private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent",
"socket-server-metrics")
+  memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS))
+  private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes,
config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
   val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
   private val processors = new Array[Processor](totalProcessorThreads)
 
@@ -86,7 +92,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time:
Time
         val processorEndIndex = processorBeginIndex + numProcessorThreads
 
         for (i <- processorBeginIndex until processorEndIndex)
-          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
+          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol,
memoryPool)
 
         val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
           processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
@@ -109,7 +115,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
         }.sum / totalProcessorThreads
       }
     )
-
+    newGauge("MemoryPoolAvailable",
+      new Gauge[Long] {
+        def value = memoryPool.availableMemory()
+      }
+    )
+    newGauge("MemoryPoolUsed",
+      new Gauge[Long] {
+        def value = memoryPool.size() - memoryPool.availableMemory()
+      }
+    )
     info("Started " + acceptors.size + " acceptor threads")
   }
 
@@ -138,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
 
   /* `protected` for test usage */
   protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName:
ListenerName,
-                                      securityProtocol: SecurityProtocol): Processor = {
+                                      securityProtocol: SecurityProtocol, memoryPool: MemoryPool):
Processor = {
     new Processor(id,
       time,
       config.socketRequestMaxBytes,
@@ -149,7 +164,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
       securityProtocol,
       config,
       metrics,
-      credentialProvider
+      credentialProvider,
+      memoryPool
     )
   }
 
@@ -378,7 +394,8 @@ private[kafka] class Processor(val id: Int,
                                securityProtocol: SecurityProtocol,
                                config: KafkaConfig,
                                metrics: Metrics,
-                               credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
+                               credentialProvider: CredentialProvider,
+                               memoryPool: MemoryPool) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -422,7 +439,8 @@ private[kafka] class Processor(val id: Int,
     metricTags,
     false,
     true,
-    ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
+    ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache),
+    memoryPool)
 
   override def run() {
     startupComplete()
@@ -517,7 +535,8 @@ private[kafka] class Processor(val id: Int,
 
         val req = RequestChannel.Request(processor = id, connectionId = receive.source, session
= session,
           buffer = receive.payload, startTimeNanos = time.nanoseconds,
-          listenerName = listenerName, securityProtocol = securityProtocol)
+          listenerName = listenerName, securityProtocol = securityProtocol,
+          memoryPool = memoryPool)
         requestChannel.sendRequest(req)
         selector.mute(receive.source)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3941e17..a900e6d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -52,6 +52,7 @@ object Defaults {
   val NumIoThreads = 8
   val BackgroundThreads = 10
   val QueuedMaxRequests = 500
+  val QueuedMaxRequestBytes = -1
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassName = ""
@@ -236,6 +237,7 @@ object KafkaConfig {
   val NumIoThreadsProp = "num.io.threads"
   val BackgroundThreadsProp = "background.threads"
   val QueuedMaxRequestsProp = "queued.max.requests"
+  val QueuedMaxBytesProp = "queued.max.request.bytes"
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
@@ -420,6 +422,7 @@ object KafkaConfig {
   val NumIoThreadsDoc = "The number of threads that the server uses for processing requests,
which may include disk I/O"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing
tasks"
   val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network
threads"
+  val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests
are read"
   val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization"
@@ -684,6 +687,7 @@ object KafkaConfig {
       .define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc)
       .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
       .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
+      .define(QueuedMaxBytesProp, LONG, Defaults.QueuedMaxRequestBytes, MEDIUM, QueuedMaxRequestBytesDoc)
       .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
 
       /************* Authorizer Configuration ***********/
@@ -900,6 +904,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
   val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
+  val queuedMaxBytes = getLong(KafkaConfig.QueuedMaxBytesProp)
   val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
   val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
   val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
@@ -1191,5 +1196,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
       s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when
inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
     require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol),
       s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp}
when SASL is used for inter-broker communication")
+    require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
+      s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index feb07b8..512be67 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -40,9 +40,9 @@ class KafkaRequestHandler(id: Int,
   private val latch = new CountDownLatch(1)
 
   def run() {
-    while (true) {
+    while(true) {
+      var req : RequestChannel.Request = null
       try {
-        var req : RequestChannel.Request = null
         while (req == null) {
           // We use a single meter for aggregate idle percentage for the thread pool.
           // Since meter is calculated as total_recorded_value / time_window and
@@ -69,6 +69,9 @@ class KafkaRequestHandler(id: Int,
           latch.countDown()
           Exit.exit(e.statusCode)
         case e: Throwable => error("Exception when handling request", e)
+      } finally {
+        if (req != null)
+          req.dispose()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index acf96e8..ed35269 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -29,6 +29,7 @@ import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
@@ -328,9 +329,9 @@ class SocketServerTest extends JUnitSuite {
     var conn: Socket = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM,
credentialProvider) {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName:
ListenerName,
-                                protocol: SecurityProtocol): Processor = {
+                                protocol: SecurityProtocol, memoryPool: MemoryPool): Processor
= {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
-          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider)
{
+          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider,
MemoryPool.NONE) {
           override protected[network] def sendResponse(response: RequestChannel.Response,
responseSend: Send) {
             conn.close()
             super.sendResponse(response, responseSend)

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ae1cfc0..38d4bb3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -35,6 +35,7 @@ import kafka.server._
 import kafka.utils.{MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@@ -395,7 +396,7 @@ class KafkaApisTest {
     val header = new RequestHeader(builder.apiKey.id, request.version, "", 0)
     val buffer = request.serialize(header)
     val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost)
-    (request, RequestChannel.Request(1, "1", session, buffer, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
+    (request, RequestChannel.Request(1, "1", session, buffer, MemoryPool.NONE, 0, new ListenerName(""),
SecurityProtocol.PLAINTEXT))
   }
 
   private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]):
AbstractResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1b801de..dee6e87 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -545,6 +545,7 @@ class KafkaConfigTest {
         case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
+        case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string


Mime
View raw message