kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2043; CompressionType is passed in each RecordAccumulator append; patched by Grant Henke; reviewed by Jun Rao
Date Mon, 06 Apr 2015 20:34:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9c23d9355 -> 75e1cc8bc


kafka-2043; CompressionType is passed in each RecordAccumulator append; patched by Grant Henke;
reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75e1cc8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75e1cc8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75e1cc8b

Branch: refs/heads/trunk
Commit: 75e1cc8bc497e6aaa0dd05454d6c817ed0fb5e23
Parents: 9c23d93
Author: Grant Henke <granthenke@gmail.com>
Authored: Mon Apr 6 13:34:31 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Apr 6 13:34:31 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  3 +-
 .../producer/internals/RecordAccumulator.java   |  7 ++--
 .../internals/RecordAccumulatorTest.java        | 34 ++++++++++----------
 .../clients/producer/internals/SenderTest.java  |  8 ++---
 4 files changed, 28 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ab26342..b91e2c5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -216,6 +216,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         metricTags.put("client-id", clientId);
         this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                                  this.totalMemorySize,
+                                                 this.compressionType,
                                                  config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                                                  retryBackoffMs,
                                                  config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
@@ -376,7 +377,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record,
callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey,
serializedValue, compressionType, callback);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey,
serializedValue, callback);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full
or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 88b4e4f..0e7ab29 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -59,6 +59,7 @@ public final class RecordAccumulator {
     private volatile AtomicInteger flushesInProgress;
     private int drainIndex;
     private final int batchSize;
+    private final CompressionType compression;
     private final long lingerMs;
     private final long retryBackoffMs;
     private final BufferPool free;
@@ -71,6 +72,7 @@ public final class RecordAccumulator {
      * 
      * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords}
instances
      * @param totalSize The maximum memory the record accumulator can use.
+     * @param compression The compression codec for the records
      * @param lingerMs An artificial delay time to add before declaring a records instance
that isn't full ready for
      *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs
will trade off some
      *        latency for potentially better throughput due to more batching (and hence fewer,
larger requests).
@@ -84,6 +86,7 @@ public final class RecordAccumulator {
      */
     public RecordAccumulator(int batchSize,
                              long totalSize,
+                             CompressionType compression,
                              long lingerMs,
                              long retryBackoffMs,
                              boolean blockOnBufferFull,
@@ -94,6 +97,7 @@ public final class RecordAccumulator {
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
         this.batchSize = batchSize;
+        this.compression = compression;
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
@@ -139,10 +143,9 @@ public final class RecordAccumulator {
      * @param tp The topic/partition to which this record is being sent
      * @param key The key for the record
      * @param value The value for the record
-     * @param compression The compression codec for the record
      * @param callback The user-supplied callback to execute when the request is complete
      */
-    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType
compression, Callback callback) throws InterruptedException {
+    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback
callback) throws InterruptedException {
         if (closed)
             throw new IllegalStateException("Cannot send after the producer is closed.");
         // check if we have an in-progress batch

http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index e379ac8..05e2929 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -64,13 +64,13 @@ public class RecordAccumulatorTest {
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false,
metrics, time,  metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, false, metrics, time,  metricTags);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, CompressionType.NONE, null);
+            accum.append(tp1, key, value, null);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
-        accum.append(tp1, key, value, CompressionType.NONE, null);
+        accum.append(tp1, key, value, null);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1),
Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
@@ -88,16 +88,16 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false,
metrics, time, metricTags);
-        accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE,
0L, 100L, false, metrics, time, metricTags);
+        accum.append(tp1, key, new byte[2 * batchSize], null);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
     }
 
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L,
false, metrics, time, metricTags);
-        accum.append(tp1, key, value, CompressionType.NONE, null);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
+        accum.append(tp1, key, value, null);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -114,12 +114,12 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false,
metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, false, metrics, time, metricTags);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
-                accum.append(tp, key, value, CompressionType.NONE, null);
+                accum.append(tp, key, value, null);
         }
         assertEquals("Partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -133,14 +133,14 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L,
true, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
0L, 100L, true, metrics, time, metricTags);
         List<Thread> threads = new ArrayList<Thread>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition(topic, i % numParts), key, value,
CompressionType.NONE, null);
+                            accum.append(new TopicPartition(topic, i % numParts), key, value,
null);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -174,13 +174,13 @@ public class RecordAccumulatorTest {
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any retries/backoff
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L,
false, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
         // Just short of going over the limit so we trigger linger time
         int appends = 1024 / msgSize;
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp1, key, value, CompressionType.NONE, null);
+            accum.append(tp1, key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
@@ -189,14 +189,14 @@ public class RecordAccumulatorTest {
 
         // Add partition on node2 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp3, key, value, CompressionType.NONE, null);
+            accum.append(tp3, key, value, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be defined by node1, half remaining linger time",
lingerMs / 2, result.nextReadyCheckDelayMs);
 
         // Add data for another partition on node1, enough to make data sendable immediately
         for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, key, value, CompressionType.NONE, null);
+            accum.append(tp2, key, value, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         // Note this can actually be < linger time because it may use delays from partitions
that aren't sendable
@@ -207,9 +207,9 @@ public class RecordAccumulatorTest {
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs,
100L, false, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE,
null);
+            accum.append(new TopicPartition(topic, i % 3), key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         

http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 24274a6..8b1805d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -52,7 +52,7 @@ public class SenderTest {
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024,
0L, 0L, false, metrics, time, metricTags);
+    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024,
CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags);
     private Sender sender = new Sender(client,
                                        metadata,
                                        this.accumulator,
@@ -72,7 +72,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null).future;
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -99,7 +99,7 @@ public class SenderTest {
                                    time,
                                    "clientId");
         // do a successful retry
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null).future;
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals(1, client.inFlightRequestCount());
@@ -116,7 +116,7 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
 
         // do an unsuccessful retry
-        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE,
null).future;
+        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future;
         sender.run(time.milliseconds()); // send produce request
         for (int i = 0; i < maxRetries + 1; i++) {
             client.disconnect(client.requests().peek().request().destination());


Mime
View raw message