kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [1/2] kafka git commit: Revert "KAFKA-2120; Add a request timeout to NetworkClient (KIP-19); reviewed by Jason Gustafson, Ismael Juma, Joel Koshy, Jun Rao, and Edward Ribeiro"
Date Thu, 17 Sep 2015 21:36:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk da39931af -> 9dbeb71ab


http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index dcc52b6..5b2e4ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -40,8 +40,6 @@ import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
 public class RecordAccumulatorTest {
@@ -65,18 +63,17 @@ public class RecordAccumulatorTest {
     private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1,
part2, part3));
     private Metrics metrics = new Metrics(time);
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
-    private final long maxBlockTimeMs = 1000;
 
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, metrics, time,  metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, false, metrics, time,  metricTags);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, key, value, null);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
-        accum.append(tp1, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, key, value, null);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1),
Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
@@ -94,16 +91,16 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE,
0L, 100L, metrics, time, metricTags);
-        accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE,
0L, 100L, false, metrics, time, metricTags);
+        accum.append(tp1, key, new byte[2 * batchSize], null);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
     }
 
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, 100L, metrics, time, metricTags);
-        accum.append(tp1, key, value, null, maxBlockTimeMs);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
+        accum.append(tp1, key, value, null);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -120,12 +117,12 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, false, metrics, time, metricTags);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
-                accum.append(tp, key, value, null, maxBlockTimeMs);
+                accum.append(tp, key, value, null);
         }
         assertEquals("Partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -139,14 +136,14 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
0L, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
0L, 100L, true, metrics, time, metricTags);
         List<Thread> threads = new ArrayList<Thread>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition(topic, i % numParts), key, value,
null, maxBlockTimeMs);
+                            accum.append(new TopicPartition(topic, i % numParts), key, value,
null);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -180,13 +177,13 @@ public class RecordAccumulatorTest {
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any retries/backoff
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE,
lingerMs, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
         // Just short of going over the limit so we trigger linger time
         int appends = 1024 / msgSize;
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp1, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
@@ -195,14 +192,14 @@ public class RecordAccumulatorTest {
 
         // Add partition on node2 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp3, key, value, null, maxBlockTimeMs);
+            accum.append(tp3, key, value, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be defined by node1, half remaining linger time",
lingerMs / 2, result.nextReadyCheckDelayMs);
 
         // Add data for another partition on node1, enough to make data sendable immediately
         for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, key, value, null, maxBlockTimeMs);
+            accum.append(tp2, key, value, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         // Note this can actually be < linger time because it may use delays from partitions
that aren't sendable
@@ -214,10 +211,10 @@ public class RecordAccumulatorTest {
     public void testRetryBackoff() throws Exception {
         long lingerMs = Long.MAX_VALUE / 4;
         long retryBackoffMs = Long.MAX_VALUE / 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, retryBackoffMs, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, retryBackoffMs, false, metrics, time, metricTags);
 
         long now = time.milliseconds();
-        accum.append(tp1, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs +
1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         Map<Integer, List<RecordBatch>> batches = accum.drain(cluster, result.readyNodes,
Integer.MAX_VALUE, now + lingerMs + 1);
@@ -229,7 +226,7 @@ public class RecordAccumulatorTest {
         accum.reenqueue(batches.get(0).get(0), now);
 
         // Put message for partition 1 into accumulator
-        accum.append(tp2, key, value, null, maxBlockTimeMs);
+        accum.append(tp2, key, value, null);
         result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
 
@@ -251,9 +248,9 @@ public class RecordAccumulatorTest {
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs);
+            accum.append(new TopicPartition(topic, i % 3), key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         
@@ -275,7 +272,7 @@ public class RecordAccumulatorTest {
     public void testAbortIncompleteBatches() throws Exception {
         long lingerMs = Long.MAX_VALUE;
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -284,7 +281,7 @@ public class RecordAccumulatorTest {
             }
         }
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback(),
maxBlockTimeMs);
+            accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback());
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
 
@@ -294,24 +291,4 @@ public class RecordAccumulatorTest {
 
     }
 
-    @Test
-    public void testExpiredBatches() throws InterruptedException {
-        Time time = new SystemTime();
-        long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10, 100L, metrics, time, metricTags);
-        int appends = 1024 / msgSize;
-        for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, null, maxBlockTimeMs);
-            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
-        }
-        time.sleep(2000);
-        accum.ready(cluster, now);
-        accum.append(tp1, key, value, null, 0);
-        Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
readyNodes);
-        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
-        now = time.milliseconds();
-        List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
-        assertEquals(1, expiredBatches.size());
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index bcf6a3a..aa44991 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -44,11 +44,10 @@ public class SenderTest {
     private static final int MAX_REQUEST_SIZE = 1024 * 1024;
     private static final short ACKS_ALL = -1;
     private static final int MAX_RETRIES = 0;
+    private static final int REQUEST_TIMEOUT_MS = 10000;
     private static final String CLIENT_ID = "clientId";
     private static final String METRIC_GROUP = "producer-metrics";
     private static final double EPS = 0.0001;
-    private static final int MAX_BLOCK_TIMEOUT = 1000;
-    private static final int REQUEST_TIMEOUT = 1000;
 
     private TopicPartition tp = new TopicPartition("test", 0);
     private MockTime time = new MockTime();
@@ -58,17 +57,17 @@ public class SenderTest {
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024,
CompressionType.NONE, 0L, 0L, metrics, time, metricTags);
+    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024,
CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags);
     private Sender sender = new Sender(client,
                                        metadata,
                                        this.accumulator,
                                        MAX_REQUEST_SIZE,
                                        ACKS_ALL,
                                        MAX_RETRIES,
+                                       REQUEST_TIMEOUT_MS,
                                        metrics,
                                        time,
-                                       CLIENT_ID,
-                                       REQUEST_TIMEOUT);
+                                       CLIENT_ID);
 
     @Before
     public void setup() {
@@ -79,7 +78,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -98,7 +97,7 @@ public class SenderTest {
     public void testQuotaMetrics() throws Exception {
         final long offset = 0;
         for (int i = 1; i <= 3; i++) {
-            Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(),
"value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(),
"value".getBytes(), null).future;
             sender.run(time.milliseconds()); // send produce request
             client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i));
             sender.run(time.milliseconds());
@@ -120,12 +119,12 @@ public class SenderTest {
                                    MAX_REQUEST_SIZE,
                                    ACKS_ALL,
                                    maxRetries,
+                                   REQUEST_TIMEOUT_MS,
                                    new Metrics(),
                                    time,
-                                   "clientId",
-                                   REQUEST_TIMEOUT);
+                                   "clientId");
         // do a successful retry
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals(1, client.inFlightRequestCount());
@@ -142,7 +141,7 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
 
         // do an unsuccessful retry
-        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future;
         sender.run(time.milliseconds()); // send produce request
         for (int i = 0; i < maxRetries + 1; i++) {
             client.disconnect(client.requests().peek().request().destination());

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
index 5056c71..df1205c 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.common.network;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -100,6 +101,24 @@ public class SSLSelectorTest {
         assertEquals("hello", blockingRequest(node, "hello"));
     }
 
+
+    /**
+     * Validate that the client can intentionally disconnect and reconnect
+     */
+    @Test
+    public void testClientDisconnect() throws Exception {
+        String node = "0";
+        blockingConnect(node);
+        selector.disconnect(node);
+        selector.send(createSend(node, "hello1"));
+        selector.poll(10L);
+        assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
+        assertEquals("There should be a disconnect", 1, selector.disconnected().size());
+        assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
+        blockingConnect(node);
+        assertEquals("hello2", blockingRequest(node, "hello2"));
+    }
+
      /**
      * Tests wrap BUFFER_OVERFLOW  and unwrap BUFFER_UNDERFLOW
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 66ca530..3a684d9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -85,6 +85,23 @@ public class SelectorTest {
     }
 
     /**
+     * Validate that the client can intentionally disconnect and reconnect
+     */
+    @Test
+    public void testClientDisconnect() throws Exception {
+        String node = "0";
+        blockingConnect(node);
+        selector.disconnect(node);
+        selector.send(createSend(node, "hello1"));
+        selector.poll(10);
+        assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
+        assertEquals("There should be a disconnect", 1, selector.disconnected().size());
+        assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
+        blockingConnect(node);
+        assertEquals("hello2", blockingRequest(node, "hello2"));
+    }
+
+    /**
      * Sending a request with one already in flight should result in an exception
      */
     @Test(expected = IllegalStateException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 5a5f963..f83fd9b 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -45,6 +45,11 @@ public class MockSelector implements Selectable {
     }
 
     @Override
+    public void disconnect(String id) {
+        this.disconnected.add(id);
+    }
+
+    @Override
     public void wakeup() {
     }
 
@@ -54,7 +59,6 @@ public class MockSelector implements Selectable {
 
     @Override
     public void close(String id) {
-        this.disconnected.add(id);
     }
 
     public void clear() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index cf79096..b1cf668 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -106,8 +106,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config:
Kaf
         1,
         0,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
-        Selectable.USE_DEFAULT_BUFFER_SIZE,
-        config.requestTimeoutMs
+        Selectable.USE_DEFAULT_BUFFER_SIZE
       )
     }
     val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker,
messageQueue, networkClient, brokerNode, config, time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1b3dfc3..1e8b233 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -64,7 +64,6 @@ object Defaults {
   val MaxConnectionsPerIp: Int = Int.MaxValue
   val MaxConnectionsPerIpOverrides: String = ""
   val ConnectionsMaxIdleMs = 10 * 60 * 1000L
-  val RequestTimeoutMs = ControllerSocketTimeoutMs
 
   /** ********* Log Configuration ***********/
   val NumPartitions = 1
@@ -198,7 +197,6 @@ object KafkaConfig {
   val NumIoThreadsProp = "num.io.threads"
   val BackgroundThreadsProp = "background.threads"
   val QueuedMaxRequestsProp = "queued.max.requests"
-  val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
   /** ********* Socket Server Configuration ***********/
@@ -342,7 +340,6 @@ object KafkaConfig {
   val NumIoThreadsDoc = "The number of io threads that the server uses for carrying out network
requests"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing
tasks"
   val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network
threads"
-  val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization"
   /** ********* Socket Server Configuration ***********/
@@ -518,7 +515,6 @@ object KafkaConfig {
       .define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc)
       .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
       .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
-      .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
 
       /************* Authorizer Configuration ***********/
       .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
@@ -697,7 +693,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
   val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
   val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
-  val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
 
   /************* Authorizer Configuration ***********/
   val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b61b96c..f3f1fa6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -310,8 +310,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
           1,
           0,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
-          Selectable.USE_DEFAULT_BUFFER_SIZE,
-          config.requestTimeoutMs)
+          Selectable.USE_DEFAULT_BUFFER_SIZE)
       }
 
       var shutdownSucceeded: Boolean = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c051038..6c85e52 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -83,8 +83,7 @@ class ReplicaFetcherThread(name: String,
       1,
       0,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
-      brokerConfig.replicaSocketReceiveBufferBytes,
-      brokerConfig.requestTimeoutMs
+      brokerConfig.replicaSocketReceiveBufferBytes
     )
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index e299f8b..46a68e9 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -38,6 +38,7 @@ import org.apache.log4j.Logger
 object ProducerPerformance extends Logging {
 
   def main(args: Array[String]) {
+
     val logger = Logger.getLogger(getClass)
     val config = new ProducerPerfConfig(args)
     if (!config.isFixedSize)
@@ -194,6 +195,7 @@ object ProducerPerformance extends Logging {
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
         props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
+        props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString)
         props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
         props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
         props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index 9ed9d29..ad10721 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -75,7 +75,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal
{
    * care.
    */
   def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime):
Option[ClientResponse] = {
-    client.send(request, time.milliseconds())
+    client.send(request)
 
     pollUntilFound(timeout) { case (responses, _) =>
       val response = responses.find { response =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index e90818a..1198df0 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -144,6 +144,49 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   }
 
   /**
+   * 1. With ack=0, the future metadata should not be blocked.
+   * 2. With ack=1, the future metadata should block,
+   *    and subsequent calls will eventually cause buffer full
+   */
+  @Test
+  def testNoResponse() {
+    // create topic
+    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+
+    // first send a message to make sure the metadata is refreshed
+    val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
"value".getBytes)
+    producer1.send(record1).get
+    producer2.send(record1).get
+
+    // stop IO threads and request handling, but leave networking operational
+    // any requests should be accepted and queue up, but not handled
+    servers.foreach(server => server.requestHandlerPool.shutdown())
+
+    producer1.send(record1).get(5000, TimeUnit.MILLISECONDS)
+
+    intercept[TimeoutException] {
+      producer2.send(record1).get(5000, TimeUnit.MILLISECONDS)
+    }
+
+    // TODO: expose producer configs after creating them
+    // send enough messages to get buffer full
+    val tooManyRecords = 10
+    val msgSize = producerBufferSize / tooManyRecords
+    val value = new Array[Byte](msgSize)
+    new Random().nextBytes(value)
+    val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
value)
+
+    intercept[KafkaException] {
+      for (i <- 1 to tooManyRecords)
+        producer2.send(record2)
+    }
+
+    // do not close produce2 since it will block
+    // TODO: can we do better?
+    producer2 = null
+  }
+
+  /**
    *  The send call with invalid partition id should throw KafkaException caused by IllegalArgumentException
    */
   @Test
@@ -244,8 +287,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     } catch {
       case e: ExecutionException =>
         if (!e.getCause.isInstanceOf[NotEnoughReplicasException]  &&
-            !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] &&
-            !e.getCause.isInstanceOf[TimeoutException]) {
+            !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException]) {
           fail("Expected NotEnoughReplicasException or NotEnoughReplicasAfterAppendException
when producing to topic " +
             "with fewer brokers than min.insync.replicas, but saw " + e.getCause)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index bfec426..5b4f2db 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -404,7 +404,6 @@ class KafkaConfigTest {
         case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
-        case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dbeb71a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ff85bf5..09b8444 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -410,6 +410,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
     producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
+    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")


Mime
View raw message