kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5364; Don't fail producer if drained partition is not yet in transaction
Date Fri, 02 Jun 2017 07:53:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 07cfcc53b -> da6da5356


KAFKA-5364; Don't fail producer if drained partition is not yet in transaction

Due to the async nature of the producer, it is possible to attempt to drain a messages whose
partition hasn't been added to the transaction yet. Before this patch, we considered this
a fatal error. However, it is only in error if the partition isn't in the queue to be sent
to the coordinator.

This patch updates the logic so that we only fail the producer if the partition would never
be added to the transaction. If the partition of the batch is yet to be added, we will simply
wait for the partition to be added to the transaction before sending the batch to the broker.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #3202 from apurvam/KAFKA-5364-ensure-partitions-added-to-txn-before-send

(cherry picked from commit 673ab671e6d72f48fcc98de0b73564983c34e752)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da6da535
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da6da535
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da6da535

Branch: refs/heads/0.11.0
Commit: da6da53563066519c587c751c1b16b65760fe096
Parents: 07cfcc5
Author: Apurva Mehta <apurva@confluent.io>
Authored: Fri Jun 2 00:53:21 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Jun 2 00:53:39 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java  |  34 +++++-
 .../internals/TransactionManagerTest.java       | 111 ++++++++++++++++++-
 2 files changed, 139 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da6da535/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 9d9deac..0a69e02 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -234,10 +234,17 @@ public class TransactionManager {
     }
 
     public synchronized boolean ensurePartitionAdded(TopicPartition tp) {
-        if (isInTransaction() && !partitionsInTransaction.contains(tp)) {
-            transitionToFatalError(new IllegalStateException("Attempted to dequeue a record
batch to send " +
-                    "for partition " + tp + ", which hasn't been added to the transaction
yet"));
+        if (hasFatalError())
             return false;
+        if (isInTransaction() || hasAbortableError()) {
+            // We should enter this branch in an error state because if this partition is
already in the transaction,
+            // there is a chance that the corresponding batch is in retry. So we must let
it completely flush.
+            if (!(partitionsInTransaction.contains(tp) || isPartitionPending(tp))) {
+                transitionToFatalError(new IllegalStateException("Attempted to dequeue a
record batch to send " +
+                        "for partition " + tp + ", which would never be added to the transaction."));
+                return false;
+            }
+            return partitionsInTransaction.contains(tp);
         }
         return true;
     }
@@ -416,6 +423,16 @@ public class TransactionManager {
         return inFlightRequestCorrelationId != NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
+    // visible for testing.
+    boolean hasFatalError() {
+        return currentState == State.FATAL_ERROR;
+    }
+
+    // visible for testing.
+    boolean hasAbortableError() {
+        return currentState == State.ABORTABLE_ERROR;
+    }
+
     // visible for testing
     synchronized boolean transactionContainsPartition(TopicPartition topicPartition) {
         return isInTransaction() && partitionsInTransaction.contains(topicPartition);
@@ -431,6 +448,10 @@ public class TransactionManager {
         return isTransactional() && currentState == State.READY;
     }
 
+    private synchronized boolean isPartitionPending(TopicPartition tp) {
+        return isInTransaction() && (pendingPartitionsInTransaction.contains(tp)
|| newPartitionsInTransaction.contains(tp));
+    }
+
     private void transitionTo(State target) {
         transitionTo(target, null);
     }
@@ -448,7 +469,12 @@ public class TransactionManager {
             lastError = null;
         }
 
-        log.debug("{}Transition from state {} to {}", logPrefix, currentState, target);
+        if (lastError != null)
+            log.error("{}Transition from state {} to error state {}", logPrefix, currentState,
+                    target, lastError);
+        else
+            log.debug("Transition from state {} to {}", logPrefix, currentState, target);
+
         currentState = target;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da6da535/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 3e3f785..96eae8f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
@@ -60,11 +61,15 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -87,8 +92,9 @@ public class TransactionManagerTest {
     private final String transactionalId = "foobar";
     private final int transactionTimeoutMs = 1121;
 
-    private TopicPartition tp0 = new TopicPartition("test", 0);
-    private TopicPartition tp1 = new TopicPartition("test", 1);
+    private final String topic = "test";
+    private TopicPartition tp0 = new TopicPartition(topic, 0);
+    private TopicPartition tp1 = new TopicPartition(topic, 1);
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
 
@@ -160,9 +166,11 @@ public class TransactionManagerTest {
 
         prepareProduceResponse(Errors.NONE, pid, epoch);
         assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertFalse(transactionManager.ensurePartitionAdded(tp0));
         sender.run(time.milliseconds());  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
+        assertTrue(transactionManager.ensurePartitionAdded(tp0));
         assertFalse(responseFuture.isDone());
 
         sender.run(time.milliseconds());  // send produce request.
@@ -884,6 +892,105 @@ public class TransactionManagerTest {
         assertTrue(abortResult.isSuccessful());
     }
 
+    @Test
+    public void testNoDrainWhenPartitionsPending() throws InterruptedException {
+        final long pid = 13131L;
+        final short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
+        transactionManager.maybeAddPartitionToTransaction(tp1);
+        accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
+
+        assertFalse(transactionManager.ensurePartitionAdded(tp0));
+        assertFalse(transactionManager.ensurePartitionAdded(tp1));
+
+        Node node1 = new Node(0, "localhost", 1111);
+        Node node2 = new Node(1, "localhost", 1112);
+        PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
+        PartitionInfo part2 = new PartitionInfo(topic, 1, node2, null, null);
+
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,
part2),
+                Collections.<String>emptySet(), Collections.<String>emptySet());
+        Set<Node> nodes = new HashSet<>();
+        nodes.add(node1);
+        nodes.add(node2);
+        Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster,
nodes, Integer.MAX_VALUE,
+                time.milliseconds());
+
+        // We shouldn't drain batches which haven't been added to the transaction yet.
+        assertTrue(drainedBatches.containsKey(node1.id()));
+        assertTrue(drainedBatches.get(node1.id()).isEmpty());
+        assertTrue(drainedBatches.containsKey(node2.id()));
+        assertTrue(drainedBatches.get(node2.id()).isEmpty());
+        assertFalse(transactionManager.hasError());
+    }
+
+    @Test
+    public void testAllowDrainInAbortableErrorState() throws InterruptedException {
+        final long pid = 13131L;
+        final short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp1);
+        prepareAddPartitionsToTxn(tp1, Errors.NONE);
+        sender.run(time.milliseconds());  // Send AddPartitions, tp1 should be in the transaction
now.
+
+        assertTrue(transactionManager.transactionContainsPartition(tp1));
+
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxn(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
+        sender.run(time.milliseconds());  // Send AddPartitions, should be in abortable state.
+
+        assertTrue(transactionManager.hasAbortableError());
+        assertTrue(transactionManager.ensurePartitionAdded(tp1));
+
+        // Try to drain a message destined for tp1, it should get drained.
+        Node node1 = new Node(1, "localhost", 1112);
+        PartitionInfo part1 = new PartitionInfo(topic, 1, node1, null, null);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1),
+                Collections.<String>emptySet(), Collections.<String>emptySet());
+        accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
+        Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster,
Collections.singleton(node1),
+                Integer.MAX_VALUE,
+                time.milliseconds());
+
+        // We should drain the appended record since we are in abortable state and the partition
has already been
+        // added to the transaction.
+        assertTrue(drainedBatches.containsKey(node1.id()));
+        assertEquals(1, drainedBatches.get(node1.id()).size());
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
+    public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException
{
+        final long pid = 13131L;
+        final short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        // Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should
result in an error on drain.
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
+        Node node1 = new Node(0, "localhost", 1111);
+        PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
+
+        Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1),
+                Collections.<String>emptySet(), Collections.<String>emptySet());
+        Set<Node> nodes = new HashSet<>();
+        nodes.add(node1);
+        Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster,
nodes, Integer.MAX_VALUE,
+                time.milliseconds());
+
+        // We shouldn't drain batches which haven't been added to the transaction yet.
+        assertTrue(drainedBatches.containsKey(node1.id()));
+        assertTrue(drainedBatches.get(node1.id()).isEmpty());
+        assertTrue(transactionManager.hasFatalError());
+    }
+
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws
InterruptedException {
         final long pid = 1L;
         final short epoch = 1;


Mime
View raw message