kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-3197; Fix producer sending records out of order
Date Tue, 08 Mar 2016 22:07:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f6e35dec9 -> 5afa16601


KAFKA-3197; Fix producer sending records out of order

This patch reuse max.in.flight.request.per.connection. When it equals to one, we take it as
user wants order protection. The current approach is make sure there is only one batch per
partition on the fly.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Aditya Auradkar <aauradkar@linkedin.com>, Jason Gustafson <jason@confluent.io>,
Grant Henke <granthenke@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Joel Koshy
<jjkoshy.w@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #857 from becketqin/KAFKA-3197


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5afa1660
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5afa1660
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5afa1660

Branch: refs/heads/trunk
Commit: 5afa1660103df8aecbf7558e761418944fb5905a
Parents: f6e35de
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Tue Mar 8 14:06:53 2016 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue Mar 8 14:06:53 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  1 +
 .../producer/internals/RecordAccumulator.java   | 84 +++++++++++++-------
 .../clients/producer/internals/Sender.java      | 17 +++-
 .../internals/RecordAccumulatorTest.java        | 37 ++++++++-
 .../clients/producer/internals/SenderTest.java  | 51 ++++++++++++
 .../java/org/apache/kafka/test/TestUtils.java   |  2 +-
 6 files changed, 156 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a066512..85ba9ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -287,6 +287,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,
+                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) ==
1,
                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                     (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                     config.getInt(ProducerConfig.RETRIES_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index f1414f0..beaa832 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -59,7 +59,6 @@ public final class RecordAccumulator {
     private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
 
     private volatile boolean closed;
-    private int drainIndex;
     private final AtomicInteger flushesInProgress;
     private final AtomicInteger appendsInProgress;
     private final int batchSize;
@@ -70,6 +69,9 @@ public final class RecordAccumulator {
     private final Time time;
     private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
     private final IncompleteRecordBatches incomplete;
+    // The following variables are only accessed by the sender thread, so we don't need to
protect them.
+    private final Set<TopicPartition> muted;
+    private int drainIndex;
 
 
     /**
@@ -105,6 +107,7 @@ public final class RecordAccumulator {
         String metricGrpName = "producer-metrics";
         this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
         this.incomplete = new IncompleteRecordBatches();
+        this.muted = new HashSet<TopicPartition>();
         this.time = time;
         registerMetrics(metrics, metricGrpName);
     }
@@ -213,7 +216,6 @@ public final class RecordAccumulator {
         List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
         int count = 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet())
{
-            TopicPartition topicAndPartition = entry.getKey();
             Deque<RecordBatch> dq = entry.getValue();
             synchronized (dq) {
                 // iterate over the batches and expire them if they have stayed in accumulator
for more than requestTimeOut
@@ -259,14 +261,20 @@ public final class RecordAccumulator {
      * partition will be ready; Also return the flag for whether there are any unknown leaders
for the accumulated
      * partition batches.
      * <p>
-     * A destination node is ready to send data if ANY one of its partition is not backing
off the send and ANY of the
-     * following are true :
+     * A destination node is ready to send data if:
      * <ol>
-     * <li>The record set is full
-     * <li>The record set has sat in the accumulator for at least lingerMs milliseconds
-     * <li>The accumulator is out of memory and threads are blocking waiting for data
(in this case all partitions are
-     * immediately considered ready).
-     * <li>The accumulator has been closed
+     * <li>There is at least one partition that is not backing off its send
+     * <li><b>and</b> those partitions are not muted (to prevent reordering
if
+     *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
+     *   is set to one)</li>
+     * <li><b>and <i>any</i></b> of the following are true</li>
+     * <ul>
+     *     <li>The record set is full</li>
+     *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
+     *     <li>The accumulator is out of memory and threads are blocking waiting for
data (in this case all partitions
+     *     are immediately considered ready).</li>
+     *     <li>The accumulator has been closed</li>
+     * </ul>
      * </ol>
      */
     public ReadyCheckResult ready(Cluster cluster, long nowMs) {
@@ -282,7 +290,7 @@ public final class RecordAccumulator {
             Node leader = cluster.leaderFor(part);
             if (leader == null) {
                 unknownLeadersExist = true;
-            } else if (!readyNodes.contains(leader)) {
+            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                 synchronized (deque) {
                     RecordBatch batch = deque.peekFirst();
                     if (batch != null) {
@@ -333,7 +341,10 @@ public final class RecordAccumulator {
      * @param now The current unix time in milliseconds
      * @return A list of {@link RecordBatch} for each node specified with total size less
than the requested maxSize.
      */
-    public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node>
nodes, int maxSize, long now) {
+    public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
+                                                 Set<Node> nodes,
+                                                 int maxSize,
+                                                 long now) {
         if (nodes.isEmpty())
             return Collections.emptyMap();
 
@@ -346,25 +357,29 @@ public final class RecordAccumulator {
             int start = drainIndex = drainIndex % parts.size();
             do {
                 PartitionInfo part = parts.get(drainIndex);
-                Deque<RecordBatch> deque = dequeFor(new TopicPartition(part.topic(),
part.partition()));
-                if (deque != null) {
-                    synchronized (deque) {
-                        RecordBatch first = deque.peekFirst();
-                        if (first != null) {
-                            boolean backoff = first.attempts > 0 && first.lastAttemptMs
+ retryBackoffMs > now;
-                            // Only drain the batch if it is not during backoff period.
-                            if (!backoff) {
-                                if (size + first.records.sizeInBytes() > maxSize &&
!ready.isEmpty()) {
-                                    // there is a rare case that a single batch size is larger
than the request size due
-                                    // to compression; in this case we will still eventually
send this batch in a single
-                                    // request
-                                    break;
-                                } else {
-                                    RecordBatch batch = deque.pollFirst();
-                                    batch.records.close();
-                                    size += batch.records.sizeInBytes();
-                                    ready.add(batch);
-                                    batch.drainedMs = now;
+                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
+                // Only proceed if the partition has no in-flight batches.
+                if (!muted.contains(tp)) {
+                    Deque<RecordBatch> deque = dequeFor(new TopicPartition(part.topic(),
part.partition()));
+                    if (deque != null) {
+                        synchronized (deque) {
+                            RecordBatch first = deque.peekFirst();
+                            if (first != null) {
+                                boolean backoff = first.attempts > 0 && first.lastAttemptMs
+ retryBackoffMs > now;
+                                // Only drain the batch if it is not during backoff period.
+                                if (!backoff) {
+                                    if (size + first.records.sizeInBytes() > maxSize &&
!ready.isEmpty()) {
+                                        // there is a rare case that a single batch size
is larger than the request size due
+                                        // to compression; in this case we will still eventually
send this batch in a single
+                                        // request
+                                        break;
+                                    } else {
+                                        RecordBatch batch = deque.pollFirst();
+                                        batch.records.close();
+                                        size += batch.records.sizeInBytes();
+                                        ready.add(batch);
+                                        batch.drainedMs = now;
+                                    }
                                 }
                             }
                         }
@@ -465,6 +480,14 @@ public final class RecordAccumulator {
         }
     }
 
+    public void mutePartition(TopicPartition tp) {
+        muted.add(tp);
+    }
+
+    public void unmutePartition(TopicPartition tp) {
+        muted.remove(tp);
+    }
+
     /**
      * Close this accumulator and force all the record buffers to be drained
      */
@@ -532,4 +555,5 @@ public final class RecordAccumulator {
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 9d24d07..db8918c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -67,6 +67,9 @@ public class Sender implements Runnable {
     /* the metadata for the client */
     private final Metadata metadata;
 
+    /* the flag indicating whether the producer should guarantee the message order on the
broker or not. */
+    private final boolean guaranteeMessageOrder;
+
     /* the maximum request size to attempt to send to the server */
     private final int maxRequestSize;
 
@@ -97,6 +100,7 @@ public class Sender implements Runnable {
     public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
+                  boolean guaranteeMessageOrder,
                   int maxRequestSize,
                   short acks,
                   int retries,
@@ -107,6 +111,7 @@ public class Sender implements Runnable {
         this.client = client;
         this.accumulator = accumulator;
         this.metadata = metadata;
+        this.guaranteeMessageOrder = guaranteeMessageOrder;
         this.maxRequestSize = maxRequestSize;
         this.running = true;
         this.acks = acks;
@@ -164,7 +169,7 @@ public class Sender implements Runnable {
      * @param now
      *            The current POSIX time in milliseconds
      */
-    public void run(long now) {
+    void run(long now) {
         Cluster cluster = metadata.fetch();
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
@@ -189,6 +194,13 @@ public class Sender implements Runnable {
                                                                          result.readyNodes,
                                                                          this.maxRequestSize,
                                                                          now);
+        if (guaranteeMessageOrder) {
+            // Mute all the partitions drained
+            for (List<RecordBatch> batchList : batches.values()) {
+                for (RecordBatch batch : batchList)
+                    this.accumulator.mutePartition(batch.topicPartition);
+            }
+        }
 
         List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout,
cluster, now);
         // update sensors
@@ -304,6 +316,9 @@ public class Sender implements Runnable {
         }
         if (error.exception() instanceof InvalidMetadataException)
             metadata.requestUpdate();
+        // Unmute the completed partition.
+        if (guaranteeMessageOrder)
+            this.accumulator.unmutePartition(batch.topicPartition);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 0f95ee5..3660272 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -14,8 +14,8 @@ package org.apache.kafka.clients.producer.internals;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -39,8 +39,6 @@ import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
 import org.junit.After;
 import org.junit.Test;
 
@@ -299,7 +297,6 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testExpiredBatches() throws InterruptedException {
-        Time time = new SystemTime();
         long now = time.milliseconds();
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10, 100L, metrics, time);
         int appends = 1024 / msgSize;
@@ -317,4 +314,36 @@ public class RecordAccumulatorTest {
         List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
         assertEquals(1, expiredBatches.size());
     }
+
+    @Test
+    public void testMutedPartitions() throws InterruptedException {
+        long now = time.milliseconds();
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10, 100L, metrics, time);
+        int appends = 1024 / msgSize;
+        for (int i = 0; i < appends; i++) {
+            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
+        }
+        time.sleep(2000);
+
+        // Test ready with muted partition
+        accum.mutePartition(tp1);
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+        assertEquals("No node should be ready", 0, result.readyNodes.size());
+
+        // Test ready without muted partition
+        accum.unmutePartition(tp1);
+        result = accum.ready(cluster, time.milliseconds());
+        assertTrue("The batch should be ready", result.readyNodes.size() > 0);
+
+        // Test drain with muted partition
+        accum.mutePartition(tp1);
+        Map<Integer, List<RecordBatch>> drained = accum.drain(cluster, result.readyNodes,
Integer.MAX_VALUE, time.milliseconds());
+        assertEquals("No batch should have been drained", 0, drained.get(node1.id()).size());
+
+        // Test drain without muted partition.
+        accum.unmutePartition(tp1);
+        drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertTrue("The batch should have been drained.", drained.get(node1.id()).size()
> 0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b983de5..fb67747 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
@@ -75,6 +76,7 @@ public class SenderTest {
         sender = new Sender(client,
                             metadata,
                             this.accumulator,
+                            true,
                             MAX_REQUEST_SIZE,
                             ACKS_ALL,
                             MAX_RETRIES,
@@ -134,6 +136,7 @@ public class SenderTest {
             Sender sender = new Sender(client,
                                        metadata,
                                        this.accumulator,
+                                       false,
                                        MAX_REQUEST_SIZE,
                                        ACKS_ALL,
                                        maxRetries,
@@ -178,6 +181,54 @@ public class SenderTest {
         }
     }
 
+    @Test
+    public void testSendInOrder() throws Exception {
+        int maxRetries = 1;
+        Metrics m = new Metrics();
+        try {
+            Sender sender = new Sender(client,
+                metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                maxRetries,
+                m,
+                time,
+                "clientId",
+                REQUEST_TIMEOUT);
+
+            // Create a two broker cluster, with partition 0 on broker 0 and partition 1
on broker 1
+            Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
+            metadata.update(cluster1, time.milliseconds());
+
+            // Send the first message.
+            TopicPartition tp2 = new TopicPartition("test", 1);
+            accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, MAX_BLOCK_TIMEOUT);
+            sender.run(time.milliseconds()); // connect
+            sender.run(time.milliseconds()); // send produce request
+            String id = client.requests().peek().request().destination();
+            assertEquals(ApiKeys.PRODUCE.id, client.requests().peek().request().header().apiKey());
+            Node node = new Node(Integer.valueOf(id), "localhost", 0);
+            assertEquals(1, client.inFlightRequestCount());
+            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+
+            time.sleep(900);
+            // Now send another message to tp2
+            accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, MAX_BLOCK_TIMEOUT);
+
+            // Update metadata before sender receives response from broker 0. Now partition
2 moves to broker 0
+            Cluster cluster2 = TestUtils.singletonCluster("test", 2);
+            metadata.update(cluster2, time.milliseconds());
+            // Sender should not send the second message to node 0.
+            sender.run(time.milliseconds());
+            assertEquals(1, client.inFlightRequestCount());
+        } finally {
+            m.close();
+        }
+
+    }
+
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws
Exception {
         assertTrue("Request should be completed", future.isDone());
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 7b88f5e..7ffc54a 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -52,7 +52,7 @@ public class TestUtils {
     public static Cluster clusterWith(int nodes, String topic, int partitions) {
         Node[] ns = new Node[nodes];
         for (int i = 0; i < nodes; i++)
-            ns[i] = new Node(0, "localhost", 1969);
+            ns[i] = new Node(i, "localhost", 1969);
         List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
         for (int i = 0; i < partitions; i++)
             parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));


Mime
View raw message