kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8635; Skip client poll in Sender loop when no request is sent (#7085)
Date Thu, 18 Jul 2019 22:08:07 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e26a46  KAFKA-8635; Skip client poll in Sender loop when no request is sent (#7085)
2e26a46 is described below

commit 2e26a46358d97112b9b912a7c5c29a2d6fb517cf
Author: Bob Barrett <bob.barrett@confluent.io>
AuthorDate: Thu Jul 18 15:07:42 2019 -0700

    KAFKA-8635; Skip client poll in Sender loop when no request is sent (#7085)
    
    This patch changes maybeSendTransactionalRequest to handle both sending and polling transactional
requests (and renames it to maybeSendAndPollTransactionalRequest), and skips the call to poll
if no request is actually sent. It also removes the inner loop inside maybeSendAndPollTransactionalRequest
and relies on the main Sender loop for retries.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/producer/internals/Sender.java   | 96 ++++++++++++----------
 .../clients/producer/internals/SenderTest.java     | 44 ++++++----
 2 files changed, 81 insertions(+), 59 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 919be28..dfa6424 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -46,6 +46,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
@@ -302,9 +303,7 @@ public class Sender implements Runnable {
                     transactionManager.transitionToFatalError(
                         new KafkaException("The client hasn't received acknowledgment for
" +
                             "some previously sent messages and can no longer retry them.
It isn't safe to continue."));
-                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest())
{
-                    // as long as there are outstanding transactional requests, we simply
wait for them to return
-                    client.poll(retryBackoffMs, time.milliseconds());
+                } else if (maybeSendAndPollTransactionalRequest()) {
                     return;
                 }
 
@@ -412,7 +411,16 @@ public class Sender implements Runnable {
         return pollTimeout;
     }
 
-    private boolean maybeSendTransactionalRequest() {
+    /**
+     * Returns true if a transactional request is sent or polled, or if a FindCoordinator
request is enqueued
+     */
+    private boolean maybeSendAndPollTransactionalRequest() {
+        if (transactionManager.hasInFlightTransactionalRequest()) {
+            // as long as there are outstanding transactional requests, we simply wait for
them to return
+            client.poll(retryBackoffMs, time.milliseconds());
+            return true;
+        }
+
         if (transactionManager.isCompleting() && accumulator.hasIncomplete()) {
             if (transactionManager.isAborting())
                 accumulator.abortUndrainedBatches(new KafkaException("Failing batch since
transaction was aborted"));
@@ -429,48 +437,43 @@ public class Sender implements Runnable {
             return false;
 
         AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
-        while (!forceClose) {
-            Node targetNode = null;
-            try {
-                if (nextRequestHandler.needsCoordinator()) {
-                    targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());
-                    if (targetNode == null) {
-                        transactionManager.lookupCoordinator(nextRequestHandler);
-                        break;
-                    }
-                    if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeoutMs))
{
-                        transactionManager.lookupCoordinator(nextRequestHandler);
-                        break;
-                    }
-                } else {
-                    targetNode = awaitLeastLoadedNodeReady(requestTimeoutMs);
-                }
-
-                if (targetNode != null) {
-                    if (nextRequestHandler.isRetry())
-                        time.sleep(nextRequestHandler.retryBackoffMs());
-                    long currentTimeMs = time.milliseconds();
-                    ClientRequest clientRequest = client.newClientRequest(
-                        targetNode.idString(), requestBuilder, currentTimeMs, true, requestTimeoutMs,
nextRequestHandler);
-                    log.debug("Sending transactional request {} to node {}", requestBuilder,
targetNode);
-                    client.send(clientRequest, currentTimeMs);
-                    transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
-                    return true;
-                }
-            } catch (IOException e) {
-                log.debug("Disconnect from {} while trying to send request {}. Going " +
-                        "to back off and retry.", targetNode, requestBuilder, e);
-                if (nextRequestHandler.needsCoordinator()) {
-                    // We break here so that we pick up the FindCoordinator request immediately.
-                    transactionManager.lookupCoordinator(nextRequestHandler);
-                    break;
-                }
+        Node targetNode = null;
+        try {
+            targetNode = awaitNodeReady(nextRequestHandler.coordinatorType());
+            if (targetNode == null) {
+                lookupCoordinatorAndRetry(nextRequestHandler);
+                return true;
             }
+
+            if (nextRequestHandler.isRetry())
+                time.sleep(nextRequestHandler.retryBackoffMs());
+            long currentTimeMs = time.milliseconds();
+            ClientRequest clientRequest = client.newClientRequest(
+                targetNode.idString(), requestBuilder, currentTimeMs, true, requestTimeoutMs,
nextRequestHandler);
+            log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
+            client.send(clientRequest, currentTimeMs);
+            transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
+            client.poll(retryBackoffMs, time.milliseconds());
+            return true;
+        } catch (IOException e) {
+            log.debug("Disconnect from {} while trying to send request {}. Going " +
+                    "to back off and retry.", targetNode, requestBuilder, e);
+            // We break here so that we pick up the FindCoordinator request immediately.
+            lookupCoordinatorAndRetry(nextRequestHandler);
+            return true;
+        }
+    }
+
+    private void lookupCoordinatorAndRetry(TransactionManager.TxnRequestHandler nextRequestHandler)
{
+        if (nextRequestHandler.needsCoordinator()) {
+            transactionManager.lookupCoordinator(nextRequestHandler);
+        } else {
+            // For non-coordinator requests, sleep here to prevent a tight loop when no node
is available
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }
+
         transactionManager.retry(nextRequestHandler);
-        return true;
     }
 
     private void maybeAbortBatches(RuntimeException exception) {
@@ -513,9 +516,12 @@ public class Sender implements Runnable {
         return NetworkClientUtils.sendAndReceive(client, request, time);
     }
 
-    private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException {
-        Node node = client.leastLoadedNode(time.milliseconds());
-        if (node != null && NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs))
{
+    private Node awaitNodeReady(FindCoordinatorRequest.CoordinatorType coordinatorType) throws
IOException {
+        Node node = coordinatorType != null ?
+                transactionManager.coordinator(coordinatorType) :
+                client.leastLoadedNode(time.milliseconds());
+
+        if (node != null && NetworkClientUtils.awaitReady(client, node, time, requestTimeoutMs))
{
             return node;
         }
         return null;
@@ -525,7 +531,7 @@ public class Sender implements Runnable {
         while (!forceClose && !transactionManager.hasProducerId() && !transactionManager.hasError())
{
             Node node = null;
             try {
-                node = awaitLeastLoadedNodeReady(requestTimeoutMs);
+                node = awaitNodeReady(null);
                 if (node != null) {
                     ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
                     InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse)
response.responseBody();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5ae76c3..194176d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -113,6 +113,8 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class SenderTest {
     private static final int MAX_REQUEST_SIZE = 1024 * 1024;
@@ -121,6 +123,7 @@ public class SenderTest {
     private static final double EPS = 0.0001;
     private static final int MAX_BLOCK_TIMEOUT = 1000;
     private static final int REQUEST_TIMEOUT = 1000;
+    private static final long RETRY_BACKOFF_MS = 50;
 
     private TopicPartition tp0 = new TopicPartition("test", 0);
     private TopicPartition tp1 = new TopicPartition("test", 1);
@@ -315,7 +318,7 @@ public class SenderTest {
         metrics = new Metrics(new MetricConfig().tags(clientTags));
         SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics);
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL,
-                1, metricsRegistry, time, REQUEST_TIMEOUT, 50, null, apiVersions);
+                1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions);
 
         // Append a message so that topic metrics are created
         accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
@@ -343,7 +346,7 @@ public class SenderTest {
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         try {
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL,
-                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
+                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null,
apiVersions);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
             sender.runOnce(); // connect
@@ -401,7 +404,7 @@ public class SenderTest {
 
         try {
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                    senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
+                    senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1
on broker 1
             MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap("test",
2));
             client.prepareMetadataUpdate(metadataUpdate1);
@@ -1172,7 +1175,7 @@ public class SenderTest {
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager,
apiVersions);
 
         Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
                 "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -1214,7 +1217,7 @@ public class SenderTest {
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, 10,
-            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+            senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
 
         Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
             "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -1253,7 +1256,7 @@ public class SenderTest {
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, 10,
-            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+            senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
 
         Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
             "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -1289,7 +1292,7 @@ public class SenderTest {
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager,
apiVersions);
 
         Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
                 "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -1778,7 +1781,7 @@ public class SenderTest {
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager,
apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -1820,7 +1823,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager,
apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.runOnce();  // connect.
@@ -1859,7 +1862,7 @@ public class SenderTest {
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager,
apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.runOnce();  // connect.
@@ -2205,7 +2208,7 @@ public class SenderTest {
         try {
             TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown",
6000, 100);
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL,
-                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 50, txnManager, apiVersions);
+                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager,
apiVersions);
 
             ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short)
0);
             TopicPartition tp = new TopicPartition("testTransactionalRequestsSentOnShutdown",
1);
@@ -2237,7 +2240,7 @@ public class SenderTest {
         try {
             TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown",
6000, 100);
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL,
-                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 50, txnManager, apiVersions);
+                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager,
apiVersions);
 
             ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short)
0);
             TopicPartition tp = new TopicPartition("testIncompleteTransactionAbortOnShutdown",
1);
@@ -2268,7 +2271,7 @@ public class SenderTest {
         try {
             TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction",
6000, 100);
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL,
-                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 50, txnManager, apiVersions);
+                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager,
apiVersions);
 
             ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short)
0);
             TopicPartition tp = new TopicPartition("testForceShutdownWithIncompleteTransaction",
1);
@@ -2293,6 +2296,19 @@ public class SenderTest {
         }
     }
 
+    @Test
+    public void testDoNotPollWhenNoRequestSent() {
+        client = spy(new MockClient(time, metadata));
+
+        TransactionManager txnManager = new TransactionManager(logContext, "testDoNotPollWhenNoRequestSent",
6000, 100);
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        // doInitTransactions calls sender.doOnce three times, only two requests are sent,
so we should only poll twice
+        verify(client, times(2)).poll(eq(RETRY_BACKOFF_MS), anyLong());
+    }
+
     class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {
 
         private TransactionResult requiredResult;
@@ -2418,7 +2434,7 @@ public class SenderTest {
                 deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
pool);
         this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator,
guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
-                Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT,
50, transactionManager, apiVersions);
+                Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT,
RETRY_BACKOFF_MS, transactionManager, apiVersions);
 
         metadata.add("test");
         this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test",
2)));


Mime
View raw message