kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1228480 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/ConsoleConsumer.scala main/scala/kafka/network/SocketServer.scala main/scala/kafka/server/KafkaServer.scala test/scala/unit/kafka/network/SocketServerTest.scala
Date Fri, 06 Jan 2012 22:50:30 GMT
Author: junrao
Date: Fri Jan  6 22:50:29 2012
New Revision: 1228480

URL: http://svn.apache.org/viewvc?rev=1228480&view=rev
Log:
Support configurable send / receive socket buffer size in server; patched by John Fung; reviewed
by Jun Rao; KAFKA-200

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1228480&r1=1228479&r2=1228480&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Fri Jan
 6 22:50:29 2012
@@ -96,7 +96,7 @@ object ConsoleConsumer extends Logging {
     
     val props = new Properties()
     props.put("groupid", options.valueOf(groupIdOpt))
-    props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString)
+    props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
     props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
     props.put("auto.commit", "true")
     props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala?rev=1228480&r1=1228479&r2=1228480&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala Fri Jan  6
22:50:29 2012
@@ -38,11 +38,13 @@ class SocketServer(val port: Int,
                    val numProcessorThreads: Int,
                    monitoringPeriodSecs: Int,
                    private val handlerFactory: Handler.HandlerMapping,
+                   val sendBufferSize: Int,
+                   val receiveBufferSize: Int,
                    val maxRequestSize: Int = Int.MaxValue) {
 
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
-  private var acceptor: Acceptor = new Acceptor(port, processors)
+  private var acceptor: Acceptor = new Acceptor(port, processors, sendBufferSize, receiveBufferSize)
   val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
 
   /**
@@ -116,7 +118,7 @@ private[kafka] abstract class AbstractSe
 /**
  * Thread that accepts and configures new connections. There is only need for one of these
  */
-private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends
AbstractServerThread {
+private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val
sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {
 
   /**
    * Accept loop that checks for new connection attempts
@@ -164,14 +166,21 @@ private[kafka] class Acceptor(val port: 
    * Accept a new connection
    */
   def accept(key: SelectionKey, processor: Processor) {
-    val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
-    if(logger.isDebugEnabled)
-      logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + "
on " + socketChannel.socket.getLocalSocketAddress)
+    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
+    serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
+    
+    val socketChannel = serverSocketChannel.accept()
     socketChannel.configureBlocking(false)
     socketChannel.socket().setTcpNoDelay(true)
+    socketChannel.socket().setSendBufferSize(sendBufferSize)
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() 
+          + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]")
+    }
+
     processor.accept(socketChannel)
   }
-
 }
 
 /**

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1228480&r1=1228479&r2=1228480&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Fri Jan  6 22:50:29
2012
@@ -66,6 +66,8 @@ class KafkaServer(val config: KafkaConfi
                                     config.numThreads,
                                     config.monitoringPeriodSecs,
                                     handlers.handlerFor,
+                                    config.socketSendBuffer,
+                                    config.socketReceiveBuffer,                         
          
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
     socketServer.startup()

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1228480&r1=1228479&r2=1228480&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Fri
Jan  6 22:50:29 2012
@@ -42,6 +42,8 @@ class SocketServerTest extends JUnitSuit
                                 numProcessorThreads = 1, 
                                 monitoringPeriodSecs = 30, 
                                 handlerFactory = (requestId: Short, receive: Receive) =>
echo, 
+                                sendBufferSize = 300000,
+                                receiveBufferSize = 300000,
                                 maxRequestSize = 50)
   server.startup()
 



Mime
View raw message