kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12620 Allocate Producer IDs in KRaft controller (#10752)
Date Thu, 03 Jun 2021 23:25:11 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e97cff2  KAFKA-12620 Allocate Producer IDs in KRaft controller (#10752)
e97cff2 is described below

commit e97cff2702b6ba836c7925caa36ab18066a7c95d
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Thu Jun 3 19:23:32 2021 -0400

    KAFKA-12620 Allocate Producer IDs in KRaft controller (#10752)
    
    This is part 2 of KIP-730. Part 1 was in #10504.
    
    This PR adds QuorumController support for handling AllocateProducerIDs requests
    and managing the state of the latest producer ID block in the controller by committing
    this state to the metadata log.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   2 +-
 .../common/message/AllocateProducerIdsRequest.json |   2 +-
 .../common/message/InitProducerIdRequest.json      |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  29 ++--
 .../main/scala/kafka/server/ControllerApis.scala   |  17 ++
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 .../server/metadata/BrokerMetadataListener.scala   |   6 +
 core/src/test/java/kafka/test/MockController.java  |   7 +
 .../transaction/ProducerIdsIntegrationTest.scala   |   3 +-
 .../unit/kafka/server/ControllerApisTest.scala     |   8 +
 .../org/apache/kafka/controller/Controller.java    |  11 ++
 .../kafka/controller/ProducerIdControlManager.java |  85 ++++++++++
 .../apache/kafka/controller/QuorumController.java  |  26 +++-
 .../org/apache/kafka/controller/ResultOrError.java |   8 +
 .../org/apache/kafka/timeline/TimelineLong.java    |   6 +-
 .../common/metadata/ProducerIdsRecord.json         |  14 +-
 .../controller/ProducerIdControlManagerTest.java   | 173 +++++++++++++++++++++
 .../kafka/controller/QuorumControllerTest.java     |  10 +-
 18 files changed, 376 insertions(+), 35 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index bd28aa2..3f42ee9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -108,7 +108,7 @@ public enum ApiKeys {
     UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0,
true),
     DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
     LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
-    ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false);
+    ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
 
     private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER
=
         new EnumMap<>(ApiMessageType.ListenerType.class);
diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
index 0cfa494..6f37313 100644
--- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
+++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 67,
   "type": "request",
-  "listeners": ["controller", "zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "AllocateProducerIdsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index 5537aa9..4e75352 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 22,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker"],
   "name": "InitProducerIdRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index bea0c53..4c76903 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
-import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
+import org.apache.kafka.common.{ClusterResource, Endpoint}
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -244,11 +244,18 @@ class BrokerServer(
       // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good
to fix the underlying issue
       groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
 
+      val producerIdManagerSupplier = () => ProducerIdManager.rpc(
+        config.brokerId,
+        brokerEpochSupplier = () => lifecycleManager.brokerEpoch(),
+        clientToControllerChannelManager,
+        config.requestTimeoutMs
+      )
+
       // Create transaction coordinator, but don't start it until we've started replica manager.
       // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good
to fix the underlying issue
       transactionCoordinator = TransactionCoordinator(config, replicaManager,
         new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
-        createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM)
+        producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
 
       autoTopicCreationManager = new DefaultAutoTopicCreationManager(
         config, Some(clientToControllerChannelManager), None, None,
@@ -376,24 +383,6 @@ class BrokerServer(
     }
   }
 
-  class TemporaryProducerIdManager() extends ProducerIdManager {
-    val maxProducerIdsPerBrokerEpoch = 1000000
-    var currentOffset = -1
-    override def generateProducerId(): Long = {
-      currentOffset = currentOffset + 1
-      if (currentOffset >= maxProducerIdsPerBrokerEpoch) {
-        fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend
past the block size of $maxProducerIdsPerBrokerEpoch")
-        throw new KafkaException("Have exhausted all demo/temporary producerIds.")
-      }
-      lifecycleManager.initialCatchUpFuture.get()
-      lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset
-    }
-  }
-
-  def createTemporaryProducerIdManager(): ProducerIdManager = {
-    new TemporaryProducerIdManager()
-  }
-
   def shutdown(): Unit = {
     if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
     try {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index fa0dc79..96822e8 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -100,6 +100,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
+        case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
         case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
         case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
         case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
@@ -767,4 +768,20 @@ class ControllerApis(val requestChannel: RequestChannel,
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
       new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
+
+  def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
+    val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest]
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+    controller.allocateProducerIds(allocatedProducerIdsRequest.data)
+      .whenComplete((results, exception) => {
+        if (exception != null) {
+          requestHelper.handleError(request, exception)
+        } else {
+          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+            results.setThrottleTimeMs(requestThrottleMs)
+            new AllocateProducerIdsResponse(results)
+          })
+        }
+      })
+  }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c5e1404..2870bce6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -216,7 +216,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
         case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
         case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
-        case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
+        case ApiKeys.ALLOCATE_PRODUCER_IDS => maybeForwardToController(request, handleAllocateProducerIdsRequest)
         case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
       }
     } catch {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 70e44c8..4b4d15e 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -182,6 +182,7 @@ class BrokerMetadataListener(
       case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec)
       case rec: ConfigRecord => handleConfigRecord(rec)
       case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec)
+      case rec: ProducerIdsRecord => handleProducerIdRecord(rec)
       case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType")
     }
   }
@@ -259,6 +260,11 @@ class BrokerMetadataListener(
     clientQuotaManager.handleQuotaRecord(record)
   }
 
+  def handleProducerIdRecord(record: ProducerIdsRecord): Unit = {
+    // This is a no-op since brokers get their producer ID blocks directly from the controller
via
+    // AllocateProducerIds RPC response
+  }
+
   class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
       extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
index 1fba295..68e73fd 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
+import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
 import org.apache.kafka.common.message.AlterIsrRequestData;
 import org.apache.kafka.common.message.AlterIsrResponseData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
@@ -302,6 +304,11 @@ public class MockController implements Controller {
     }
 
     @Override
+    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(AllocateProducerIdsRequestData
request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
             createPartitions(long deadlineNs, List<CreatePartitionsTopic> topicList)
{
         if (!active) {
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index 6c7c248..d5e082a 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -44,7 +44,8 @@ class ProducerIdsIntegrationTest {
 
   @ClusterTests(Array(
     new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"),
-    new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0")
+    new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0"),
+    new ClusterTest(clusterType = Type.RAFT, brokers = 3, ibp = "3.0-IV0")
   ))
   def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
     verifyUniqueIds(clusterInstance)
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 418ee31..89fbd05 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -411,6 +411,14 @@ class ControllerApisTest {
   }
 
   @Test
+  def testUnauthorizedHandleAllocateProducerIds(): Unit = {
+    assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
+      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
+      handleAllocateProducerIdsRequest(buildRequest(new AllocateProducerIdsRequest.Builder(
+        new AllocateProducerIdsRequestData()).build())))
+  }
+
+  @Test
   def testUnauthorizedHandleListPartitionReassignments(): Unit = {
     assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
       Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index a34b084..3cb0d26 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -20,6 +20,8 @@ package org.apache.kafka.controller;
 import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
+import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
 import org.apache.kafka.common.message.AlterIsrRequestData;
 import org.apache.kafka.common.message.AlterIsrResponseData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
@@ -224,6 +226,15 @@ public interface Controller extends AutoCloseable {
     );
 
     /**
+     * Allocate a block of producer IDs for transactional and idempotent producers
+     * @param request   The allocate producer IDs request
+     * @return          A future which yields a new producer ID block as a response
+     */
+    CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
+        AllocateProducerIdsRequestData request
+    );
+
+    /**
      * Begin writing a controller snapshot.  If there was already an ongoing snapshot, it
      * simply returns information about that snapshot rather than starting a new one.
      *
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
new file mode 100644
index 0000000..924605c
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.metadata.ProducerIdsRecord;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.ProducerIdsBlock;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineLong;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class ProducerIdControlManager {
+
+    private final ClusterControlManager clusterControlManager;
+    private final TimelineLong lastProducerId;
+
+    ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry
snapshotRegistry) {
+        this.clusterControlManager = clusterControlManager;
+        this.lastProducerId = new TimelineLong(snapshotRegistry, 0L);
+    }
+
+    ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch)
{
+        clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch);
+
+        long producerId = lastProducerId.get();
+
+        if (producerId > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
+            throw new UnknownServerException("Exhausted all producerIds as the next block's
end producerId " +
+                "is will has exceeded long type limit");
+        }
+
+        long nextProducerId = producerId + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE;
+        ProducerIdsRecord record = new ProducerIdsRecord()
+            .setProducerIdsEnd(nextProducerId)
+            .setBrokerId(brokerId)
+            .setBrokerEpoch(brokerEpoch);
+        ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE);
+        return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(record,
(short) 0)), block);
+    }
+
+    void replay(ProducerIdsRecord record) {
+        long currentProducerId = lastProducerId.get();
+        if (record.producerIdsEnd() <= currentProducerId) {
+            throw new RuntimeException("Producer ID from record is not monotonically increasing");
+        } else {
+            lastProducerId.set(record.producerIdsEnd());
+        }
+    }
+
+    Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
+        List<ApiMessageAndVersion> records = new ArrayList<>(1);
+
+        long producerId = lastProducerId.get(epoch);
+        if (producerId > 0) {
+            records.add(new ApiMessageAndVersion(
+                new ProducerIdsRecord()
+                    .setProducerIdsEnd(producerId)
+                    .setBrokerId(0)
+                    .setBrokerEpoch(0L),
+                (short) 0));
+        }
+        return Collections.singleton(records).iterator();
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 746d906..bee0b69 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
+import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
 import org.apache.kafka.common.message.AlterIsrRequestData;
 import org.apache.kafka.common.message.AlterIsrResponseData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
@@ -44,6 +46,7 @@ import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.MetadataRecordType;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.common.metadata.QuotaRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
@@ -359,7 +362,8 @@ public final class QuorumController implements Controller {
                     new Section("cluster", clusterControl.iterator(epoch)),
                     new Section("replication", replicationControl.iterator(epoch)),
                     new Section("configuration", configurationControl.iterator(epoch)),
-                    new Section("clientQuotas", clientQuotaControlManager.iterator(epoch))));
+                    new Section("clientQuotas", clientQuotaControlManager.iterator(epoch)),
+                    new Section("producerIds", producerIdControlManager.iterator(epoch))));
             reschedule(0);
         }
 
@@ -855,6 +859,9 @@ public final class QuorumController implements Controller {
                 case QUOTA_RECORD:
                     clientQuotaControlManager.replay((QuotaRecord) message);
                     break;
+                case PRODUCER_IDS_RECORD:
+                    producerIdControlManager.replay((ProducerIdsRecord) message);
+                    break;
                 default:
                     throw new RuntimeException("Unhandled record type " + type);
             }
@@ -930,6 +937,12 @@ public final class QuorumController implements Controller {
     private final FeatureControlManager featureControl;
 
     /**
+     * An object which stores the controller's view of the latest producer ID
+     * that has been generated. This must be accessed only by the event queue thread.
+     */
+    private final ProducerIdControlManager producerIdControlManager;
+
+    /**
      * An object which stores the controller's view of topics and partitions.
      * This must be accessed only by the event queue thread.
      */
@@ -995,6 +1008,7 @@ public final class QuorumController implements Controller {
         this.clusterControl = new ClusterControlManager(logContext, time,
             snapshotRegistry, sessionTimeoutNs, replicaPlacer);
         this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
+        this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
         this.replicationControl = new ReplicationControlManager(snapshotRegistry,
             logContext, defaultReplicationFactor, defaultNumPartitions,
@@ -1200,6 +1214,16 @@ public final class QuorumController implements Controller {
     }
 
     @Override
+    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
+            AllocateProducerIdsRequestData request) {
+        return appendWriteEvent("allocateProducerIds",
+            () -> producerIdControlManager.generateNextProducerId(request.brokerId(),
request.brokerEpoch()))
+            .thenApply(result -> new AllocateProducerIdsResponseData()
+                    .setProducerIdStart(result.producerIdStart())
+                    .setProducerIdLen(result.producerIdLen()));
+    }
+
+    @Override
     public CompletableFuture<List<CreatePartitionsTopicResult>>
             createPartitions(long deadlineNs, List<CreatePartitionsTopic> topics) {
         if (topics.isEmpty()) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
index 2fedacd..6a548c4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
@@ -42,6 +42,14 @@ public class ResultOrError<T> {
         this.result = result;
     }
 
+    public static <T> ResultOrError<T> of(T result) {
+        return new ResultOrError<>(result);
+    }
+
+    public static <T> ResultOrError<T> of(ApiError error) {
+        return new ResultOrError<>(error);
+    }
+
     public boolean isError() {
         return error != null;
     }
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
index e057391..36a300f 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
@@ -47,8 +47,12 @@ public class TimelineLong implements Revertable {
     private long value;
 
     public TimelineLong(SnapshotRegistry snapshotRegistry) {
+        this(snapshotRegistry, 0L);
+    }
+
+    public TimelineLong(SnapshotRegistry snapshotRegistry, long value) {
         this.snapshotRegistry = snapshotRegistry;
-        this.value = 0;
+        this.value = value;
     }
 
     public long get() {
diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
similarity index 79%
copy from clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
copy to metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
index 0cfa494..09e6b53 100644
--- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
+++ b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
@@ -9,21 +9,21 @@
 //
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implie
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
 {
-  "apiKey": 67,
-  "type": "request",
-  "listeners": ["controller", "zkBroker"],
-  "name": "AllocateProducerIdsRequest",
+  "apiKey": 15,
+  "type": "metadata",
+  "name": "ProducerIdsRecord",
   "validVersions": "0",
-  "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The ID of the requesting broker" },
     { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
-      "about": "The epoch of the requesting broker" }
+      "about": "The epoch of the requesting broker" },
+    { "name": "ProducerIdsEnd", "type": "int64", "versions": "0+",
+      "about": "The highest producer ID that has been generated"}
   ]
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
new file mode 100644
index 0000000..f96510d
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.metadata.ProducerIdsRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.ProducerIdsBlock;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class ProducerIdControlManagerTest {
+
+    private SnapshotRegistry snapshotRegistry;
+    private ClusterControlManager clusterControl;
+    private ProducerIdControlManager producerIdControlManager;
+
+    @BeforeEach
+    public void setUp() {
+        final LogContext logContext = new LogContext();
+        final MockTime time = new MockTime();
+        final Random random = new Random();
+        snapshotRegistry = new SnapshotRegistry(logContext);
+        clusterControl = new ClusterControlManager(
+            logContext, time, snapshotRegistry, 1000,
+            new StripedReplicaPlacer(random));
+
+        clusterControl.activate();
+        for (int i = 0; i < 4; i++) {
+            RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(i);
+            brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
+                    setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+                    setPort((short) 9092).
+                    setName("PLAINTEXT").
+                    setHost(String.format("broker-%02d.example.org", i)));
+            clusterControl.replay(brokerRecord);
+        }
+
+        this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
+    }
+
+    @Test
+    public void testInitialResult() {
+        ControllerResult<ProducerIdsBlock> result =
+            producerIdControlManager.generateNextProducerId(1, 100);
+        assertEquals(0, result.response().producerIdStart());
+        assertEquals(1000, result.response().producerIdLen());
+        ProducerIdsRecord record = (ProducerIdsRecord) result.records().get(0).message();
+        assertEquals(1000, record.producerIdsEnd());
+    }
+
+    @Test
+    public void testMonotonic() {
+        producerIdControlManager.replay(
+            new ProducerIdsRecord()
+                .setBrokerId(1)
+                .setBrokerEpoch(100)
+                .setProducerIdsEnd(42));
+
+        ProducerIdsBlock range =
+            producerIdControlManager.generateNextProducerId(1, 100).response();
+        assertEquals(42, range.producerIdStart());
+
+        // Can't go backwards in Producer IDs
+        assertThrows(RuntimeException.class, () -> {
+            producerIdControlManager.replay(
+                new ProducerIdsRecord()
+                    .setBrokerId(1)
+                    .setBrokerEpoch(100)
+                    .setProducerIdsEnd(40));
+        }, "Producer ID range must only increase");
+        range = producerIdControlManager.generateNextProducerId(1, 100).response();
+        assertEquals(42, range.producerIdStart());
+
+        // Gaps in the ID range are okay.
+        producerIdControlManager.replay(
+            new ProducerIdsRecord()
+                .setBrokerId(1)
+                .setBrokerEpoch(100)
+                .setProducerIdsEnd(50));
+        range = producerIdControlManager.generateNextProducerId(1, 100).response();
+        assertEquals(50, range.producerIdStart());
+    }
+
+    @Test
+    public void testUnknownBrokerOrEpoch() {
+        ControllerResult<ProducerIdsBlock> result;
+
+        assertThrows(StaleBrokerEpochException.class, () ->
+            producerIdControlManager.generateNextProducerId(99, 0));
+
+        assertThrows(StaleBrokerEpochException.class, () ->
+            producerIdControlManager.generateNextProducerId(1, 99));
+    }
+
+    @Test
+    public void testMaxValue() {
+        producerIdControlManager.replay(
+            new ProducerIdsRecord()
+                .setBrokerId(1)
+                .setBrokerEpoch(100)
+                .setProducerIdsEnd(Long.MAX_VALUE - 1));
+
+        assertThrows(UnknownServerException.class, () ->
+            producerIdControlManager.generateNextProducerId(1, 100));
+    }
+
+    @Test
+    public void testSnapshotIterator() {
+        ProducerIdsBlock range = null;
+        for (int i = 0; i < 100; i++) {
+            range = generateProducerIds(producerIdControlManager, i % 4, 100);
+        }
+
+        Iterator<List<ApiMessageAndVersion>> snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE);
+        assertTrue(snapshotIterator.hasNext());
+        List<ApiMessageAndVersion> batch = snapshotIterator.next();
+        assertEquals(1, batch.size(), "Producer IDs record batch should only contain a single
record");
+        assertEquals(range.producerIdStart() + range.producerIdLen(), ((ProducerIdsRecord)
batch.get(0).message()).producerIdsEnd());
+        assertFalse(snapshotIterator.hasNext(), "Producer IDs iterator should only contain
a single batch");
+
+        ProducerIdControlManager newProducerIdManager = new ProducerIdControlManager(clusterControl,
snapshotRegistry);
+        snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE);
+        while (snapshotIterator.hasNext()) {
+            snapshotIterator.next().forEach(message -> newProducerIdManager.replay((ProducerIdsRecord)
message.message()));
+        }
+
+        // Verify that after reloading state from this "snapshot", we don't produce any overlapping
IDs
+        long lastProducerID = range.producerIdStart() + range.producerIdLen() - 1;
+        range = generateProducerIds(producerIdControlManager, 1, 100);
+        assertTrue(range.producerIdStart() > lastProducerID);
+    }
+
+    static ProducerIdsBlock generateProducerIds(
+            ProducerIdControlManager producerIdControlManager, int brokerId, long brokerEpoch)
{
+        ControllerResult<ProducerIdsBlock> result =
+            producerIdControlManager.generateNextProducerId(brokerId, brokerEpoch);
+        result.records().forEach(apiMessageAndVersion ->
+            producerIdControlManager.replay((ProducerIdsRecord) apiMessageAndVersion.message()));
+        return result.response();
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index c6114de..5a39f82 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -34,6 +34,7 @@ import java.util.function.Function;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
@@ -54,6 +55,7 @@ import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -267,6 +269,8 @@ public class QuorumControllerTest {
                                         setBrokerIds(Arrays.asList(1, 2, 0))).
                                             iterator()))).iterator()))).get();
                 fooId = fooData.topics().find("foo").topicId();
+                active.allocateProducerIds(
+                    new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
                 long snapshotEpoch = active.beginWritingSnapshot().get();
                 writer = snapshotWriterBuilder.writers.takeFirst();
                 assertEquals(snapshotEpoch, writer.epoch());
@@ -338,7 +342,11 @@ public class QuorumControllerTest {
                 setEndPoints(new BrokerEndpointCollection(Arrays.asList(
                     new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                         setPort(9095).setSecurityProtocol((short) 0)).iterator())).
-                setRack(null), (short) 0))),
+                setRack(null), (short) 0)),
+            Arrays.asList(new ApiMessageAndVersion(new ProducerIdsRecord().
+                setBrokerId(0).
+                setBrokerEpoch(brokerEpochs.get(0)).
+                setProducerIdsEnd(1000), (short) 0))),
             iterator);
     }
 

Mime
View raw message