kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 3.0 updated: KAFKA-12158; Better return type of RaftClient.scheduleAppend (#10909)
Date Mon, 02 Aug 2021 22:07:36 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 271abae  KAFKA-12158; Better return type of RaftClient.scheduleAppend (#10909)
271abae is described below

commit 271abae78667456cd3090e69c87927293ff6f663
Author: dengziming <swzmdeng@163.com>
AuthorDate: Tue Aug 3 05:47:03 2021 +0800

    KAFKA-12158; Better return type of RaftClient.scheduleAppend (#10909)
    
    This patch improves the return type for `scheduleAppend` and `scheduleAtomicAppend`. Previously
we were using a `Long` value and using both `null` and `Long.MaxValue` to distinguish between
different error cases. In this PR, we change the return type to `long` and only return a value
if the append was accepted. For the error cases, we instead throw an exception. For this purpose,
the patch introduces a couple new exception types: `BufferAllocationException` and `NotLeaderException`.
    
    Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason
Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/raft/RaftManager.scala   | 40 +----------
 .../main/scala/kafka/tools/TestRaftServer.scala    | 13 ++--
 .../apache/kafka/controller/QuorumController.java  |  7 +-
 .../org/apache/kafka/metalog/LocalLogManager.java  |  4 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 18 ++---
 .../java/org/apache/kafka/raft/RaftClient.java     | 24 ++++---
 .../org/apache/kafka/raft/ReplicatedCounter.java   | 10 ++-
 .../raft/errors/BufferAllocationException.java     | 29 ++++++++
 .../kafka/raft/errors/NotLeaderException.java      | 38 ++++++++++
 .../apache/kafka/raft/errors/RaftException.java    | 39 ++++++++++
 .../kafka/raft/internals/BatchAccumulator.java     | 35 +++++----
 .../org/apache/kafka/snapshot/SnapshotWriter.java  |  2 +-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 83 +++++++++++++++++++++-
 13 files changed, 252 insertions(+), 90 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 8696da4..7e63a25 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -91,16 +91,6 @@ trait RaftManager[T] {
     listener: RaftClient.Listener[T]
   ): Unit
 
-  def scheduleAtomicAppend(
-    epoch: Int,
-    records: Seq[T]
-  ): Option[Long]
-
-  def scheduleAppend(
-    epoch: Int,
-    records: Seq[T]
-  ): Option[Long]
-
   def leaderAndEpoch: LeaderAndEpoch
 
   def client: RaftClient[T]
@@ -167,34 +157,6 @@ class KafkaRaftManager[T](
     client.register(listener)
   }
 
-  override def scheduleAtomicAppend(
-    epoch: Int,
-    records: Seq[T]
-  ): Option[Long] = {
-    append(epoch, records, true)
-  }
-
-  override def scheduleAppend(
-    epoch: Int,
-    records: Seq[T]
-  ): Option[Long] = {
-    append(epoch, records, false)
-  }
-
-  private def append(
-    epoch: Int,
-    records: Seq[T],
-    isAtomic: Boolean
-  ): Option[Long] = {
-    val offset = if (isAtomic) {
-      client.scheduleAtomicAppend(epoch, records.asJava)
-    } else {
-      client.scheduleAppend(epoch, records.asJava)
-    }
-
-    Option(offset).map(Long.unbox)
-  }
-
   override def handleRequest(
     header: RequestHeader,
     request: ApiMessage,
@@ -227,7 +189,7 @@ class KafkaRaftManager[T](
       metrics,
       expirationService,
       logContext,
-      metaProperties.clusterId.toString,
+      metaProperties.clusterId,
       OptionalInt.of(config.nodeId),
       raftConfig
     )
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 52ec1e1..774805e 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -19,7 +19,6 @@ package kafka.tools
 
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
-
 import joptsimple.OptionException
 import kafka.network.SocketServer
 import kafka.raft.{KafkaRaftManager, RaftManager}
@@ -36,6 +35,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
+import org.apache.kafka.raft.errors.NotLeaderException
 import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
 import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.snapshot.SnapshotReader
@@ -193,10 +193,13 @@ class TestRaftServer(
       currentTimeMs: Long
     ): Unit = {
       recordCount.incrementAndGet()
-
-      raftManager.scheduleAppend(leaderEpoch, Seq(payload)) match {
-        case Some(offset) => pendingAppends.offer(PendingAppend(offset, currentTimeMs))
-        case None => time.sleep(10)
+      try {
+        val offset = raftManager.client.scheduleAppend(leaderEpoch, List(payload).asJava)
+        pendingAppends.offer(PendingAppend(offset, currentTimeMs))
+      } catch {
+        case e: NotLeaderException =>
+          logger.debug(s"Append failed because this node is no longer leader in epoch $leaderEpoch",
e)
+          time.sleep(10)
       }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index dba8817..7c1215d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -580,17 +580,12 @@ public final class QuorumController implements Controller {
                 // written before we can return our result to the user.  Here, we hand off
                 // the batch of records to the raft client.  They will be written out
                 // asynchronously.
-                final Long offset;
+                final long offset;
                 if (result.isAtomic()) {
                     offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
                 } else {
                     offset = raftClient.scheduleAppend(controllerEpoch, result.records());
                 }
-                if (offset == null) {
-                    throw new IllegalStateException("The raft client was unable to allocate
a buffer for an append");
-                } else if (offset == Long.MAX_VALUE) {
-                    throw new IllegalStateException("Unable to append records since this
is not the leader");
-                }
                 op.processBatchEndOffset(offset);
                 writeOffset = offset;
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index eeb7696..531ecc8 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -657,7 +657,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
     }
 
     @Override
-    public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
+    public long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
         if (batch.isEmpty()) {
             throw new IllegalArgumentException("Batch cannot be empty");
         }
@@ -689,7 +689,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
     }
 
     @Override
-    public Long scheduleAtomicAppend(int epoch, List<ApiMessageAndVersion> batch) {
+    public long scheduleAtomicAppend(int epoch, List<ApiMessageAndVersion> batch) {
         return shared.tryAppend(nodeId, leader.epoch(), batch);
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 8ea7daf..11a00a3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.raft.RequestManager.ConnectionState;
+import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.BatchMemoryPool;
 import org.apache.kafka.raft.internals.BlockingMessageQueue;
@@ -2244,24 +2245,23 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
     }
 
     @Override
-    public Long scheduleAppend(int epoch, List<T> records) {
+    public long scheduleAppend(int epoch, List<T> records) {
         return append(epoch, records, false);
     }
 
     @Override
-    public Long scheduleAtomicAppend(int epoch, List<T> records) {
+    public long scheduleAtomicAppend(int epoch, List<T> records) {
         return append(epoch, records, true);
     }
 
-    private Long append(int epoch, List<T> records, boolean isAtomic) {
-        Optional<LeaderState<T>> leaderStateOpt = quorum.maybeLeaderState();
-        if (!leaderStateOpt.isPresent()) {
-            return Long.MAX_VALUE;
-        }
+    private long append(int epoch, List<T> records, boolean isAtomic) {
+        LeaderState<T> leaderState = quorum.<T>maybeLeaderState().orElseThrow(
+            () -> new NotLeaderException("Append failed because the replication is not
the current leader")
+        );
 
-        BatchAccumulator<T> accumulator = leaderStateOpt.get().accumulator();
+        BatchAccumulator<T> accumulator = leaderState.accumulator();
         boolean isFirstAppend = accumulator.isEmpty();
-        final Long offset;
+        final long offset;
         if (isAtomic) {
             offset = accumulator.appendAtomic(epoch, records);
         } else {
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 46cd292..8e4f50e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.raft.errors.BufferAllocationException;
+import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.SnapshotWriter;
 
@@ -136,19 +138,20 @@ public interface RaftClient<T> extends AutoCloseable {
      *
      * If the provided current leader epoch does not match the current epoch, which
      * is possible when the state machine has yet to observe the epoch change, then
-     * this method will return {@link Long#MAX_VALUE} to indicate an offset which is
-     * not possible to become committed. The state machine is expected to discard all
+     * this method will throw an {@link NotLeaderException} to indicate the leader
+     * to resign its leadership. The state machine is expected to discard all
      * uncommitted entries after observing an epoch change.
      *
      * @param epoch the current leader epoch
      * @param records the list of records to append
-     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the records
could
-     *         be committed; null if no memory could be allocated for the batch at this time
+     * @return the expected offset of the last record if append succeed
      * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of
the records is greater than the maximum
      *         batch size; if this exception is throw none of the elements in records were
      *         committed
+     * @throws NotLeaderException if we are not the current leader or the epoch doesn't match
the leader epoch
+     * @throws BufferAllocationException if we failed to allocate memory for the records
      */
-    Long scheduleAppend(int epoch, List<T> records);
+    long scheduleAppend(int epoch, List<T> records);
 
     /**
      * Append a list of records to the log. The write will be scheduled for some time
@@ -158,19 +161,20 @@ public interface RaftClient<T> extends AutoCloseable {
      *
      * If the provided current leader epoch does not match the current epoch, which
      * is possible when the state machine has yet to observe the epoch change, then
-     * this method will return {@link Long#MAX_VALUE} to indicate an offset which is
-     * not possible to become committed. The state machine is expected to discard all
+     * this method will throw an {@link NotLeaderException} to indicate the leader
+     * to resign its leadership. The state machine is expected to discard all
      * uncommitted entries after observing an epoch change.
      *
      * @param epoch the current leader epoch
      * @param records the list of records to append
-     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the records
could
-     *         be committed; null if no memory could be allocated for the batch at this time
+     * @return the expected offset of the last record if append succeed
      * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of
the records is greater than the maximum
      *         batch size; if this exception is throw none of the elements in records were
      *         committed
+     * @throws NotLeaderException if we are not the current leader or the epoch doesn't match
the leader epoch
+     * @throws BufferAllocationException we failed to allocate memory for the records
      */
-    Long scheduleAtomicAppend(int epoch, List<T> records);
+    long scheduleAtomicAppend(int epoch, List<T> records);
 
     /**
      * Attempt a graceful shutdown of the client. This allows the leader to proactively
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index a5a0a4e..66303c6 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -18,6 +18,7 @@ package org.apache.kafka.raft;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.SnapshotWriter;
 import org.slf4j.Logger;
@@ -61,10 +62,13 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer>
{
 
         int epoch = claimedEpoch.getAsInt();
         uncommitted += 1;
-        Long offset = client.scheduleAppend(epoch, singletonList(uncommitted));
-        if (offset != null) {
+        try {
+            long offset = client.scheduleAppend(epoch, singletonList(uncommitted));
             log.debug("Scheduled append of record {} with epoch {} at offset {}",
                 uncommitted, epoch, offset);
+        } catch (NotLeaderException e) {
+            log.info("Appending failed, transition to resigned", e);
+            client.resign(epoch);
         }
     }
 
@@ -103,7 +107,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer>
{
             }
             log.debug("Counter incremented from {} to {}", initialCommitted, committed);
 
-            if (lastOffsetSnapshotted + snapshotDelayInRecords  < lastCommittedOffset)
{
+            if (lastOffsetSnapshotted + snapshotDelayInRecords < lastCommittedOffset)
{
                 log.debug(
                     "Generating new snapshot with committed offset {} and epoch {} since
the previoud snapshot includes {}",
                     lastCommittedOffset,
diff --git a/raft/src/main/java/org/apache/kafka/raft/errors/BufferAllocationException.java
b/raft/src/main/java/org/apache/kafka/raft/errors/BufferAllocationException.java
new file mode 100644
index 0000000..ecd36b3
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/errors/BufferAllocationException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.errors;
+
+/**
+ * Indicates that an operation is failed because we failed to allocate memory for it.
+ */
+public class BufferAllocationException extends RaftException {
+
+    private final static long serialVersionUID = 1L;
+
+    public BufferAllocationException(String s) {
+        super(s);
+    }
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/errors/NotLeaderException.java b/raft/src/main/java/org/apache/kafka/raft/errors/NotLeaderException.java
new file mode 100644
index 0000000..7f737fa
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/errors/NotLeaderException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.errors;
+
+/**
+ * Indicates that an operation is not allowed because this node is not the
+ * current leader or the epoch is not the same with the current leader epoch.
+ */
+public class NotLeaderException extends RaftException {
+
+    private final static long serialVersionUID = 1L;
+
+    public NotLeaderException(String s) {
+        super(s);
+    }
+
+    public NotLeaderException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public NotLeaderException(Throwable throwable) {
+        super(throwable);
+    }
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/errors/RaftException.java b/raft/src/main/java/org/apache/kafka/raft/errors/RaftException.java
new file mode 100644
index 0000000..6df196b
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/errors/RaftException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * RaftException is the top-level exception type generated by Kafka raft implementations.
+ */
+public class RaftException extends KafkaException {
+
+    private final static long serialVersionUID = 1L;
+
+    public RaftException(String s) {
+        super(s);
+    }
+
+    public RaftException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public RaftException(Throwable throwable) {
+        super(throwable);
+    }
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index bfd6ebc..697394d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.errors.BufferAllocationException;
+import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.server.common.serialization.RecordSerde;
 
 import org.apache.kafka.common.message.LeaderChangeMessage;
@@ -95,16 +97,17 @@ public class BatchAccumulator<T> implements Closeable {
      * this method can split the records into multiple batches it is possible that some of
the
      * records will get committed while other will not when the leader fails.
      *
-     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
-     *              will be returned as an offset which cannot become committed
+     * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException}
+     *              will be thrown
      * @param records the list of records to include in the batches
-     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch
does not
-     *         match; null if no memory could be allocated for the batch at this time
+     * @return the expected offset of the last record
      * @throws RecordBatchTooLargeException if the size of one record T is greater than the
maximum
      *         batch size; if this exception is throw some of the elements in records may
have
      *         been committed
+     * @throws NotLeaderException if the epoch doesn't match the leader epoch
+     * @throws BufferAllocationException if we failed to allocate memory for the records
      */
-    public Long append(int epoch, List<T> records) {
+    public long append(int epoch, List<T> records) {
         return append(epoch, records, false);
     }
 
@@ -113,22 +116,26 @@ public class BatchAccumulator<T> implements Closeable {
      * same underlying record batch so that either all of the records become committed or
none of
      * them do.
      *
-     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
-     *              will be returned as an offset which cannot become committed
+     * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException}
+     *              will be thrown
      * @param records the list of records to include in a batch
-     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch
does not
-     *         match; null if no memory could be allocated for the batch at this time
+     * @return the expected offset of the last record
      * @throws RecordBatchTooLargeException if the size of the records is greater than the
maximum
      *         batch size; if this exception is throw none of the elements in records were
      *         committed
+     * @throws NotLeaderException if the epoch doesn't match the leader epoch
+     * @throws BufferAllocationException if we failed to allocate memory for the records
      */
-    public Long appendAtomic(int epoch, List<T> records) {
+    public long appendAtomic(int epoch, List<T> records) {
         return append(epoch, records, true);
     }
 
-    private Long append(int epoch, List<T> records, boolean isAtomic) {
-        if (epoch != this.epoch) {
-            return Long.MAX_VALUE;
+    private long append(int epoch, List<T> records, boolean isAtomic) {
+        if (epoch < this.epoch) {
+            throw new NotLeaderException("Append failed because the epoch doesn't match");
+        } else if (epoch > this.epoch) {
+            throw new IllegalArgumentException("Attempt to append from epoch " + epoch +
+                " which is larger than the current epoch " + this.epoch);
         }
 
         ObjectSerializationCache serializationCache = new ObjectSerializationCache();
@@ -148,7 +155,7 @@ public class BatchAccumulator<T> implements Closeable {
                 }
 
                 if (batch == null) {
-                    return null;
+                    throw new BufferAllocationException("Append failed because we failed
to allocate memory to write the batch");
                 }
 
                 batch.appendRecord(record, serializationCache);
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
index 068c219..62fc2d7 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
@@ -43,7 +43,7 @@ import java.util.function.Supplier;
  * topic partition from offset 0 up to but not including the end offset in the snapshot
  * id.
  *
- * @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch)
+ * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
  */
 final public class SnapshotWriter<T> implements AutoCloseable {
     final private RawSnapshotWriter snapshot;
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index d85714b..669a211 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.raft;
 
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
@@ -34,6 +35,8 @@ import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.requests.DescribeQuorumRequest;
 import org.apache.kafka.common.requests.EndQuorumEpochResponse;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.errors.BufferAllocationException;
+import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
@@ -270,7 +273,7 @@ public class KafkaRaftClientTest {
         assertEquals(0L, context.log.endOffset().offset);
         context.assertElectedLeader(epoch, localId);
         context.client.poll();
-        assertEquals(Long.MAX_VALUE, context.client.scheduleAppend(epoch, Arrays.asList("a",
"b")));
+        assertThrows(NotLeaderException.class, () -> context.client.scheduleAppend(epoch,
Arrays.asList("a", "b")));
 
         context.pollUntilRequest();
         int correlationId = context.assertSentEndQuorumEpochRequest(epoch, 1);
@@ -284,6 +287,84 @@ public class KafkaRaftClientTest {
     }
 
     @Test
+    public void testAppendFailedWithNotLeaderException() throws Exception {
+        int localId = 0;
+        Set<Integer> voters = Utils.mkSet(localId, 1);
+        int epoch = 2;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .build();
+
+        assertThrows(NotLeaderException.class, () -> context.client.scheduleAppend(epoch,
Arrays.asList("a", "b")));
+    }
+
+    @Test
+    public void testAppendFailedWithBufferAllocationException() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
+        ByteBuffer leaderBuffer = ByteBuffer.allocate(256);
+        // Return null when allocation error
+        Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES))
+            .thenReturn(null);
+        Mockito.when(memoryPool.tryAllocate(256))
+            .thenReturn(leaderBuffer);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withMemoryPool(memoryPool)
+            .build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+        int epoch = context.currentEpoch();
+
+        assertThrows(BufferAllocationException.class, () -> context.client.scheduleAppend(epoch,
singletonList("a")));
+    }
+
+    @Test
+    public void testAppendFailedWithFencedEpoch() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+        int epoch = context.currentEpoch();
+
+        // Throws IllegalArgumentException on higher epoch
+        assertThrows(IllegalArgumentException.class, () -> context.client.scheduleAppend(epoch
+ 1, singletonList("a")));
+        // Throws NotLeaderException on smaller epoch
+        assertThrows(NotLeaderException.class, () -> context.client.scheduleAppend(epoch
- 1, singletonList("a")));
+    }
+
+    @Test
+    public void testAppendFailedWithRecordBatchTooLargeException() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+        int epoch = context.currentEpoch();
+
+        int size = KafkaRaftClient.MAX_BATCH_SIZE_BYTES / 8 + 1; // 8 is the estimate min
size of each record
+        List<String> batchToLarge = new ArrayList<>(size + 1);
+        for (int i = 0; i < size; i++)
+            batchToLarge.add("a");
+
+        assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch,
batchToLarge));
+    }
+
+    @Test
     public void testEndQuorumEpochRetriesWhileResigned() throws Exception {
         int localId = 0;
         int voter1 = 1;

Mime
View raw message