kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8677: Simplify the best-effort network client poll to never throw exception (#7613)
Date Fri, 08 Nov 2019 17:03:32 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 59f063c  KAFKA-8677: Simplify the best-effort network client poll to never throw
exception (#7613)
59f063c is described below

commit 59f063c1f515278285a5623d644dbaf8cedc2633
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Fri Nov 8 09:02:34 2019 -0800

    KAFKA-8677: Simplify the best-effort network client poll to never throw exception (#7613)
    
    Within KafkaConsumer.poll, we have an optimization to try to send the next fetch request
before returning the data in order to pipelining the fetch requests; however, this pollNoWakeup
should NOT throw any exceptions, since at this point the fetch position has been updated.
If an exception is thrown and the callers decide to capture and continue, those records would
never be returned again, causing data loss.
    
    Also fix the flaky test itself.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>,
Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../consumer/internals/ConsumerNetworkClient.java  | 21 ++++++++++++++++
 .../internals/AbstractCoordinatorTest.java         | 29 ----------------------
 .../kafka/api/EndToEndAuthorizationTest.scala      |  3 ++-
 4 files changed, 24 insertions(+), 31 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7c6023d..77499ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1246,7 +1246,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     // NOTE: since the consumed position has already been updated, we must
not allow
                     // wakeups or any other errors to be triggered prior to returning the
fetched records.
                     if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
-                        client.pollNoWakeup();
+                        client.transmitSends();
                     }
 
                     return this.interceptors.onConsume(new ConsumerRecords<>(records));
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 5a6f860..ffaa483 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -304,6 +304,27 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /**
+     * Poll for network IO in best-effort only trying to transmit the ready-to-send request
+     * Do not check any pending requests or metadata errors so that no exception should ever
+     * be thrown, also no wakeups be triggered and no interrupted exception either.
+     */
+    public void transmitSends() {
+        Timer timer = time.timer(0);
+
+        // do not try to handle any disconnects, prev request failures, metadata exception
etc;
+        // just try once and return immediately
+        lock.lock();
+        try {
+            // send all the requests we can send now
+            trySend(timer.currentTimeMs());
+
+            client.poll(0, timer.currentTimeMs());
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
      * Block until all pending requests from the given node have finished.
      * @param node The node to await requests from
      * @param timer Timer bounding how long this method can block
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index d67fae9..6a5b1f0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -34,7 +33,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
@@ -498,33 +496,6 @@ public class AbstractCoordinatorTest {
         assertTrue(leaveGroupFuture.exception() instanceof UnknownMemberIdException);
     }
 
-    @Test
-    public void testHandleSingleLeaveGroupRequest() {
-        setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, Optional.empty());
-        mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.LEAVE_GROUP.id, (short)
2, (short) 2));
-
-        LeaveGroupResponse expectedResponse = leaveGroupResponse(Collections.singletonList(
-            new MemberResponse()
-                .setErrorCode(Errors.NONE.code())
-                .setMemberId(memberId)));
-        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
-        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
-        mockClient.prepareResponse(body -> {
-            if (body instanceof LeaveGroupRequest) {
-                LeaveGroupRequest request = (LeaveGroupRequest) body;
-                return request.data().memberId().equals(memberId)
-                    && request.data().members().isEmpty();
-            } else {
-                return false;
-            }
-        }, expectedResponse);
-
-        coordinator.ensureActiveGroup();
-        RequestFuture<Void> leaveGroupFuture = coordinator.maybeLeaveGroup("test single
leave group");
-        assertTrue(leaveGroupFuture.succeeded());
-    }
-
     private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse leaveGroupResponse)
{
         setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, Optional.empty());
 
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 6b4b7c6..9a40d69 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -181,6 +181,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
   this.serverConfig.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
   this.serverConfig.setProperty(KafkaConfig.ConnectionsMaxReauthMsProp, "1500")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
+  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1500")
 
   /**
     * Starts MiniKDC and only then sets up the parent trait.
@@ -363,7 +364,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
 
     // Add ACLs and verify successful produce/consume/describe on first topic
     setReadAndWriteAcls(tp)
-    consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset =
numRecords, topic2)
+    consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset =
1, topic2)
     sendRecords(producer, numRecords, tp)
     consumeRecords(consumer, numRecords, topic = topic)
     val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).values


Mime
View raw message