kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] 02/02: MINOR: Augment log4j to add generation number in performAssign (#7451)
Date Thu, 17 Oct 2019 17:10:32 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f357fc0339c11d9412f433df1920b509c575c31b
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Wed Oct 9 10:34:19 2019 -0700

    MINOR: Augment log4j to add generation number in performAssign (#7451)
    
    Since generation is private in AbstractCoordinator, I need to modify the generation()
to let it return the object directly.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    | 66 ++++++++++++++--------
 .../consumer/internals/ConsumerCoordinator.java    | 11 ++--
 .../internals/AbstractCoordinatorTest.java         |  2 +-
 .../internals/ConsumerCoordinatorTest.java         |  6 +-
 .../runtime/distributed/WorkerCoordinator.java     |  2 +-
 5 files changed, 52 insertions(+), 35 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 057cfb0..3a76276 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -638,7 +638,7 @@ public abstract class AbstractCoordinator implements Closeable {
                                 .setGenerationId(generation.generationId)
                                 .setAssignments(Collections.emptyList())
                 );
-        log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
+        log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator,
this.generation, requestBuilder);
         return sendSyncGroupRequest(requestBuilder);
     }
 
@@ -665,7 +665,7 @@ public abstract class AbstractCoordinator implements Closeable {
                                     .setGenerationId(generation.generationId)
                                     .setAssignments(groupAssignmentList)
                     );
-            log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator,
requestBuilder);
+            log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}",
this.coordinator, this.generation, requestBuilder);
             return sendSyncGroupRequest(requestBuilder);
         } catch (RuntimeException e) {
             return RequestFuture.failure(e);
@@ -819,36 +819,29 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     /**
-     * Get the current generation state if the group is stable.
-     * @return the current generation or null if the group is unjoined/rebalancing
+     * Get the current generation state, regardless of whether it is currently stable.
+     * Note that the generation information can be updated while we are still in the middle
+     * of a rebalance, after the join-group response is received.
+     *
+     * @return the current generation
      */
     protected synchronized Generation generation() {
-        if (this.state != MemberState.STABLE)
-            return null;
         return generation;
     }
 
-    protected synchronized String memberId() {
-        return generation == null ? JoinGroupRequest.UNKNOWN_MEMBER_ID :
-                generation.memberId;
-    }
-
     /**
-     * Check whether given generation id is matching the record within current generation.
-     * Only using in unit tests.
-     * @param generationId generation id
-     * @return true if the two ids are matching.
+     * Get the current generation state if the group is stable, otherwise return null
+     *
+     * @return the current generation or null
      */
-    final synchronized boolean hasMatchingGenerationId(int generationId) {
-        return generation != null && generation.generationId == generationId;
+    protected synchronized Generation generationIfStable() {
+        if (this.state != MemberState.STABLE)
+            return null;
+        return generation;
     }
 
-    /**
-     * @return true if the current generation's member ID is valid, false otherwise
-     */
-    // Visible for testing
-    final synchronized boolean hasValidMemberId() {
-        return generation != null && generation.hasMemberId();
+    protected synchronized String memberId() {
+        return generation.memberId;
     }
 
     private synchronized void resetGeneration() {
@@ -1348,12 +1341,35 @@ public abstract class AbstractCoordinator implements Closeable {
 
     }
 
-    // For testing only
+    // For testing only below
     public Heartbeat heartbeat() {
         return heartbeat;
     }
 
-    public void setLastRebalanceTime(final long timestamp) {
+    final void setLastRebalanceTime(final long timestamp) {
         lastRebalanceEndMs = timestamp;
     }
+
+    /**
+     * Check whether given generation id is matching the record within current generation.
+     *
+     * @param generationId generation id
+     * @return true if the two ids are matching.
+     */
+    final boolean hasMatchingGenerationId(int generationId) {
+        return generation != Generation.NO_GENERATION && generation.generationId
== generationId;
+    }
+
+    final boolean hasUnknownGeneration() {
+        return generation == Generation.NO_GENERATION;
+    }
+
+    /**
+     * @return true if the current generation's member ID is valid, false otherwise
+     */
+    final boolean hasValidMemberId() {
+        return generation != Generation.NO_GENERATION && generation.hasMemberId();
+    }
+
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 6b39acb..bceb9b8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -582,7 +582,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         assignmentSnapshot = metadataSnapshot;
 
-        log.debug("Finished assignment for group: {}", assignments);
+        log.info("Finished assignment for group at generation {}: {}", generation().generationId,
assignments);
 
         Map<String, ByteBuffer> groupAssignment = new HashMap<>();
         for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet())
{
@@ -764,8 +764,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                                                         final Timer timer)
{
         if (partitions.isEmpty()) return Collections.emptyMap();
 
-        final Generation generation = generation();
-        if (pendingCommittedOffsetRequest != null && !pendingCommittedOffsetRequest.sameRequest(partitions,
generation)) {
+        final Generation generationForOffsetRequest = generationIfStable();
+        if (pendingCommittedOffsetRequest != null &&
+            !pendingCommittedOffsetRequest.sameRequest(partitions, generationForOffsetRequest))
{
             // if we were waiting for a different request, then just clear it.
             pendingCommittedOffsetRequest = null;
         }
@@ -779,7 +780,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 future = pendingCommittedOffsetRequest.response;
             } else {
                 future = sendOffsetFetchRequest(partitions);
-                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions,
generation, future);
+                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions,
generationForOffsetRequest, future);
 
             }
             client.poll(future, timer);
@@ -1039,7 +1040,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         final Generation generation;
         if (subscriptions.partitionsAutoAssigned()) {
-            generation = generation();
+            generation = generationIfStable();
             // if the generation is null, we are not part of an active group (and we expect
to be).
             // the only thing we can do is fail the commit and let the user rejoin the group
in poll()
             if (generation == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 66786ca..d67fae9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -413,7 +413,7 @@ public class AbstractCoordinatorTest {
         assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
         assertEquals(Errors.UNKNOWN_MEMBER_ID.message(), future.exception().getMessage());
         assertTrue(coordinator.rejoinNeededOrPending());
-        assertTrue(coordinator.hasMatchingGenerationId(defaultGeneration));
+        assertTrue(coordinator.hasUnknownGeneration());
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 930eb54..fd3411b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -987,7 +987,7 @@ public class ConsumerCoordinatorTest {
         coordinator.maybeLeaveGroup("test maybe leave group");
         assertTrue(received.get());
 
-        AbstractCoordinator.Generation generation = coordinator.generation();
+        AbstractCoordinator.Generation generation = coordinator.generationIfStable();
         assertNull(generation);
     }
 
@@ -1865,7 +1865,7 @@ public class ConsumerCoordinatorTest {
 
         AbstractCoordinator.Generation expectedGeneration = new AbstractCoordinator.Generation(1,
consumerId, partitionAssignor.name());
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(expectedGeneration, coordinator.generation());
+        assertEquals(expectedGeneration, coordinator.generationIfStable());
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
 
@@ -1873,7 +1873,7 @@ public class ConsumerCoordinatorTest {
             new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE)));
 
         assertTrue(coordinator.rejoinNeededOrPending());
-        assertEquals(expectedGeneration, coordinator.generation());
+        assertEquals(expectedGeneration, coordinator.generationIfStable());
     }
 
     @Test(expected = KafkaException.class)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index ff47e36..79940e0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -221,7 +221,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements
Closeable
 
     @Override
     public String memberId() {
-        Generation generation = generation();
+        Generation generation = generationIfStable();
         if (generation != null)
             return generation.memberId;
         return JoinGroupRequest.UNKNOWN_MEMBER_ID;


Mime
View raw message