kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-1697; Remove support for producer ack > 1 on the broker; reviewed by Joel Koshy
Date Fri, 13 Feb 2015 21:08:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a3d6dcaf1 -> eab4f4c9f


KAFKA-1697; Remove support for producer ack > 1 on the broker; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eab4f4c9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eab4f4c9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eab4f4c9

Branch: refs/heads/trunk
Commit: eab4f4c9f43e20b3d91bf0e2e9be6b6fd72f0acf
Parents: a3d6dca
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Fri Feb 13 13:08:10 2015 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Feb 13 13:08:10 2015 -0800

----------------------------------------------------------------------
 .../errors/InvalidRequiredAcksException.java    | 25 ++++++
 .../NotEnoughReplicasAfterAppendException.java  | 12 ---
 .../apache/kafka/common/protocol/Errors.java    | 19 +----
 .../main/scala/kafka/cluster/Partition.scala    |  2 -
 .../src/main/scala/kafka/server/KafkaApis.scala |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     | 85 +++++++++++++-------
 .../api/RequestResponseSerializationTest.scala  |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 26 ++++++
 8 files changed, 111 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eab4f4c9/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
new file mode 100644
index 0000000..9d19b28
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidRequiredAcksException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidRequiredAcksException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eab4f4c9/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
index a6107b8..fd7f6d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
@@ -23,20 +23,8 @@ package org.apache.kafka.common.errors;
 public class NotEnoughReplicasAfterAppendException extends RetriableException {
     private static final long serialVersionUID = 1L;
 
-    public NotEnoughReplicasAfterAppendException() {
-        super();
-    }
-
-    public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
     public NotEnoughReplicasAfterAppendException(String message) {
         super(message);
     }
 
-    public NotEnoughReplicasAfterAppendException(Throwable cause) {
-        super(cause);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eab4f4c9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a8deac4..ad2171f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -19,21 +19,7 @@ package org.apache.kafka.common.protocol;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.errors.NetworkException;
-import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
-import org.apache.kafka.common.errors.NotEnoughReplicasException;
-import org.apache.kafka.common.errors.NotLeaderForPartitionException;
-import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
-import org.apache.kafka.common.errors.OffsetOutOfRangeException;
-import org.apache.kafka.common.errors.RecordBatchTooLargeException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.*;
 
 /**
  * This class contains all the client-server errors--those errors that must be sent from
the server to the client. These
@@ -70,7 +56,8 @@ public enum Errors {
     NOT_ENOUGH_REPLICAS(19,
             new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync
replicas than required.")),
     NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
-            new NotEnoughReplicasAfterAppendException("Messages are written to the log, but
to fewer in-sync replicas than required."));
+            new NotEnoughReplicasAfterAppendException("Messages are written to the log, but
to fewer in-sync replicas than required.")),
+    INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified
an invalid value for required acks."));
 
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>,
Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/eab4f4c9/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e6ad8be..bfe4f45 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -317,8 +317,6 @@ class Partition(val topic: String,
           } else {
             (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
           }
-        } else if (requiredAcks > 0 && numAcks >= requiredAcks) {
-          (true, ErrorMapping.NoError)
         } else
           (false, ErrorMapping.NoError)
       case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/eab4f4c9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6ee7d88..703886a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -185,7 +185,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // the producer client will know that some error has happened and will refresh its
metadata
         if (errorInResponse) {
           info("Close connection due to error handling produce request with correlation id
%d from client id %s with ack=0"
-            .format(produceRequest.correlationId, produceRequest.clientId))
+                  .format(produceRequest.correlationId, produceRequest.clientId))
           requestChannel.closeConnection(request.processor, request)
         } else {
           requestChannel.noOperation(request.processor, request)

http://git-wip-us.apache.org/repos/asf/kafka/blob/eab4f4c9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index fb948b9..ce36cc7 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,6 +29,8 @@ import kafka.message.{ByteBufferMessageSet, MessageSet}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.{IOException, File}
 import java.util.concurrent.TimeUnit
+import org.apache.kafka.common.protocol.Errors
+
 import scala.Predef._
 import scala.collection._
 import scala.collection.mutable.HashMap
@@ -253,43 +255,66 @@ class ReplicaManager(val config: KafkaConfig,
                      messagesPerPartition: Map[TopicAndPartition, MessageSet],
                      responseCallback: Map[TopicAndPartition, ProducerResponseStatus] =>
Unit) {
 
-    val sTime = SystemTime.milliseconds
-    val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition,
requiredAcks)
-    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+    if (isValidRequiredAcks(requiredAcks)) {
 
-    val produceStatus = localProduceResults.map{ case (topicAndPartition, result) =>
-      topicAndPartition ->
-        ProducePartitionStatus(
-          result.info.lastOffset + 1, // required offset
-          ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response
status
-    }
+      val sTime = SystemTime.milliseconds
+      val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition,
requiredAcks)
+      debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
-    if(requiredAcks == 0 ||
-      requiredAcks == 1 ||
-      messagesPerPartition.size <= 0 ||
-      localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) {
-      // in case of the following we can respond immediately:
-      //
-      // 1. required acks = 0 or 1
-      // 2. there is no data to append
-      // 3. all partition appends have failed
-      val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
-      responseCallback(produceResponseStatus)
-    } else {
-      // create delayed produce operation
-      val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-      val delayedProduce =  new DelayedProduce(timeout, produceMetadata, this, responseCallback)
+      val produceStatus = localProduceResults.map { case (topicAndPartition, result) =>
+        topicAndPartition ->
+                ProducePartitionStatus(
+                  result.info.lastOffset + 1, // required offset
+                  ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response
status
+      }
 
-      // create a list of (topic, partition) pairs to use as keys for this delayed produce
operation
-      val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+      if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults))
{
+        // create delayed produce operation
+        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
+        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
 
-      // try to complete the request immediately, otherwise put it into the purgatory
-      // this is because while the delayed produce operation is being created, new
-      // requests may arrive and hence make this operation completable.
-      delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
+        // create a list of (topic, partition) pairs to use as keys for this delayed produce
operation
+        val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+
+        // try to complete the request immediately, otherwise put it into the purgatory
+        // this is because while the delayed produce operation is being created, new
+        // requests may arrive and hence make this operation completable.
+        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
+
+      } else {
+        // we can respond immediately
+        val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
+        responseCallback(produceResponseStatus)
+      }
+    } else {
+      // If required.acks is outside accepted range, something is wrong with the client
+      // Just return an error and don't handle the request at all
+      val responseStatus = messagesPerPartition.map {
+        case (topicAndPartition, messageSet) =>
+          (topicAndPartition ->
+                  ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code,
+                    LogAppendInfo.UnknownLogAppendInfo.firstOffset))
+      }
+      responseCallback(responseStatus)
     }
   }
 
+  // If all the following conditions are true, we need to put a delayed produce request and
wait for replication to complete
+  //
+  // 1. required acks = -1
+  // 2. there is data to append
+  // 3. at least one partition append was successful (fewer errors than partitions)
+  private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition,
MessageSet],
+                                       localProduceResults: Map[TopicAndPartition, LogAppendResult]):
Boolean = {
+    requiredAcks == -1 &&
+    messagesPerPartition.size > 0 &&
+    localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size
+  }
+
+  private def isValidRequiredAcks(requiredAcks: Short): Boolean = {
+    requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0
+  }
+
   /**
    * Append the messages to the local replica logs
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/eab4f4c9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index a1f72f8..fba852a 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -59,7 +59,7 @@ object SerializationTestUtils {
   private val partitionDataMessage3 = new ByteBufferMessageSet(new Message("fourth message".getBytes))
   private val partitionDataProducerRequestArray = Array(partitionDataMessage0, partitionDataMessage1,
partitionDataMessage2, partitionDataMessage3)
 
-  private val topicDataProducerRequest = {
+  val topicDataProducerRequest = {
     val groupedData = Array(topic1, topic2).flatMap(topic =>
       partitionDataProducerRequestArray.zipWithIndex.map
       {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eab4f4c9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index faa9071..d1ed5c2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -17,16 +17,21 @@
 
 package kafka.server
 
+import kafka.api.{ProducerResponseStatus, SerializationTestUtils, ProducerRequest}
+import kafka.common.TopicAndPartition
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.File
 
+import org.apache.kafka.common.protocol.Errors
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 
+import scala.collection.Map
+
 class ReplicaManagerTest extends JUnit3Suite {
 
   val topic = "test-topic"
@@ -63,4 +68,25 @@ class ReplicaManagerTest extends JUnit3Suite {
     // shutdown the replica manager upon test completion
     rm.shutdown(false)
   }
+
+  @Test
+  def testIllegalRequiredAcks() {
+    val props = TestUtils.createBrokerConfig(1)
+    val config = new KafkaConfig(props)
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val time: MockTime = new MockTime()
+    val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
+    val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest)
+    def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = {
+      assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code)
+    }
+
+    rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition
= produceRequest.data, responseCallback = callback)
+
+    rm.shutdown(false);
+
+    TestUtils.verifyNonDaemonThreadsStatus
+
+  }
 }


Mime
View raw message