/*
 * Decompiled with CFR 0.152.
 */
package kafka.coordinator.transaction;

import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.network.SocketServer;
import kafka.server.IntegrationTestUtils$;
import kafka.server.KafkaConfig$;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.AutoStart;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.NotNothing$;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ClusterTestExtensions.class})
@ScalaSignature(bytes="\u0006\u0001\u0005ec\u0001B\u0005\u000b\u0001EAQ\u0001\u0007\u0001\u0005\u0002eAQ\u0001\b\u0001\u0005\u0002uAQA\u000e\u0001\u0005\u0002]BQA\u001b\u0001\u0005\u0002-DQ\u0001\u001e\u0001\u0005\u0002UDQA \u0001\u0005\u0002}Dq!!\u0004\u0001\t\u0013\ty\u0001C\u0004\u0002\u0014\u0001!I!!\u0006\u00035A\u0013x\u000eZ;dKJLEm]%oi\u0016<'/\u0019;j_:$Vm\u001d;\u000b\u0005-a\u0011a\u0003;sC:\u001c\u0018m\u0019;j_:T!!\u0004\b\u0002\u0017\r|wN\u001d3j]\u0006$xN\u001d\u0006\u0002\u001f\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0013!\t\u0019b#D\u0001\u0015\u0015\u0005)\u0012!B:dC2\f\u0017BA\f\u0015\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012A\u0007\t\u00037\u0001i\u0011AC\u0001\u0006g\u0016$X\u000f\u001d\u000b\u0003=\u0005\u0002\"aE\u0010\n\u0005\u0001\"\"\u0001B+oSRDQA\t\u0002A\u0002\r\nQb\u00197vgR,'oQ8oM&<\u0007C\u0001\u0013(\u001b\u0005)#B\u0001\u0014\u000f\u0003\u0011!Xm\u001d;\n\u0005!*#!D\"mkN$XM]\"p]\u001aLw\r\u000b\u0002\u0003UA\u00111\u0006N\u0007\u0002Y)\u0011QFL\u0001\u0004CBL'BA\u00181\u0003\u001dQW\u000f]5uKJT!!\r\u001a\u0002\u000b),h.\u001b;\u000b\u0003M\n1a\u001c:h\u0013\t)DF\u0001\u0006CK\u001a|'/Z#bG\"\fQ\u0003^3tiVs\u0017.];f!J|G-^2fe&#7\u000f\u0006\u0002\u001fq!)\u0011h\u0001a\u0001u\u0005y1\r\\;ti\u0016\u0014\u0018J\\:uC:\u001cW\r\u0005\u0002%w%\u0011A(\n\u0002\u0010\u00072,8\u000f^3s\u0013:\u001cH/\u00198dK\"\"1A\u0010#F!\ty$)D\u0001A\u0015\t\tU%\u0001\u0006b]:|G/\u0019;j_:L!a\u0011!\u0003\u0019\rcWo\u001d;feR+7\u000f^:\u0002\u000bY\fG.^3-\u0007\u0019{6mK\u0004H\u0015.\u0003\u0016KU*\u0011\u0005}B\u0015BA%A\u0005-\u0019E.^:uKJ$Vm\u001d;\u0002\u0017\rdWo\u001d;feRK\b/\u001a\u0013\u0002\u0019&\u0011QJT\u0001\u00035.S!a\u0014!\u0002\tQK\b/Z\u0001\bEJ|7.\u001a:t;\u0005\u0019\u0011aD7fi\u0006$\u0017\r^1WKJ\u001c\u0018n\u001c8%\u0003QK!!\u0016,\u0002\u0017%\u0013\u0005k\u0018\u001a`q}Ke+\r\u0006\u0003/b\u000bq\"T3uC\u0012\fG/\u0019,feNLwN\u001c\u0006\u00033j\u000baaY8n[>t'BA.]\u0003\u0019\u0019XM\u001d<fe*\u0011q\"\u0018\u0006\u0003=J\na!\u00199bG\",7fB$K\u0017B\u000b&\u000b\u0019\u0013\u0002C&\u0011!MV\u0001\f\u0013\n\u0003vlM01?&3\u0006gK\u0004H\u0015\u0012\u0004\u0016KU4%\u0003\u0015L!A\u001a(\u0002\u000b-\u0013\u0016I\u0012+%\u0003!L!!\u001b,\u0002\u0017%\u0013\u0005kX\u001a`g}Ke\u000bM\u0001\u001di\u0016\u001cH/\u00168jcV,\u0007K]8ek\u000e,'/\u00133t\u0005Vl\u0007/\u0013\"Q)\tqB\u000eC\u0003:\t\u0001\u0007!\b\u000b\u0005\u0005\u000f*[\u0005+\u00158p\u0003%\tW\u000f^8Ti\u0006\u0014H\u000fJ\u0001q\u0013\t\t(/\u0001\u0002O\u001f*\u00111\u000fQ\u0001\n\u0003V$xn\u0015;beR\fq\u0007^3ti\"\u000bg\u000e\u001a7f\u00032dwnY1uKB\u0013x\u000eZ;dKJLEm]*j]\u001edWMU3rk\u0016\u001cH\u000fS1oI2,'\u000f\u00165sK\u0006$GC\u0001\u0010w\u0011\u0015IT\u00011\u0001;Q\u0011)\u0001\u0010R>\u0011\u0005-J\u0018B\u0001>-\u0005\u001d!\u0016.\\3pkRt\u0012\u0001\u0006\u0015\t\u000b\u001dS5\nU?o_v\t\u0011!\u0001\u0014uKN$X*\u001e7uSBdW-\u00117m_\u000e\fG/\u001a)s_\u0012,8-\u001a:JIN\u0014V-];fgR$2AHA\u0001\u0011\u0015Id\u00011\u0001;Q!1qIS&Q{:|\u0007f\u0001\u0004\u0002\bA\u00191&!\u0003\n\u0007\u0005-AF\u0001\u0005ESN\f'\r\\3e\u0003=1XM]5gsVs\u0017.];f\u0013\u0012\u001cHc\u0001\u0010\u0002\u0012!)\u0011h\u0002a\u0001u\u0005qa.\u001a=u!J|G-^2fe&#GCBA\f\u0003;\ti\u0003E\u0002\u0014\u00033I1!a\u0007\u0015\u0005\u0011auN\\4\t\u000f\u0005}\u0001\u00021\u0001\u0002\"\u00051!M]8lKJ\u0004B!a\t\u0002*5\u0011\u0011Q\u0005\u0006\u0004\u0003Oq\u0011a\u00028fi^|'o[\u0005\u0005\u0003W\t)C\u0001\u0007T_\u000e\\W\r^*feZ,'\u000fC\u0004\u00020!\u0001\r!!\r\u0002\u00111L7\u000f^3oKJ\u0004B!a\r\u0002:5\u0011\u0011Q\u0007\u0006\u0005\u0003O\t9D\u0003\u0002Z9&!\u00111HA\u001b\u00051a\u0015n\u001d;f]\u0016\u0014h*Y7fQ\u0019\u0001\u0011q\b#\u0002LA!\u0011\u0011IA$\u001b\t\t\u0019EC\u0002\u0002F1\n\u0011\"\u001a=uK:\u001c\u0018n\u001c8\n\t\u0005%\u00131\t\u0002\u000b\u000bb$XM\u001c3XSRDGFAA'G\t\ty\u0005\u0005\u0003\u0002R\u0005USBAA*\u0015\t\tT%\u0003\u0003\u0002X\u0005M#!F\"mkN$XM\u001d+fgR,\u0005\u0010^3og&|gn\u001d")
public class ProducerIdsIntegrationTest {
    @BeforeEach
    public void setup(ClusterConfig clusterConfig) {
        clusterConfig.serverProperties().put(KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), "1");
        clusterConfig.serverProperties().put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), "3");
    }

    @ClusterTests(value={@ClusterTest(clusterType=Type.ZK, brokers=3, metadataVersion=MetadataVersion.IBP_2_8_IV1), @ClusterTest(clusterType=Type.ZK, brokers=3, metadataVersion=MetadataVersion.IBP_3_0_IV0), @ClusterTest(clusterType=Type.KRAFT, brokers=3, metadataVersion=MetadataVersion.IBP_3_3_IV0)})
    public void testUniqueProducerIds(ClusterInstance clusterInstance) {
        this.verifyUniqueIds(clusterInstance);
    }

    @ClusterTest(clusterType=Type.ZK, brokers=3, autoStart=AutoStart.NO)
    public void testUniqueProducerIdsBumpIBP(ClusterInstance clusterInstance) {
        clusterInstance.config().serverProperties().put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "2.8");
        clusterInstance.config().brokerServerProperties(0).put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "3.0-IV0");
        clusterInstance.start();
        this.verifyUniqueIds(clusterInstance);
        clusterInstance.stop();
    }

    @ClusterTest(clusterType=Type.ZK, brokers=1, autoStart=AutoStart.NO)
    @Timeout(value=20L)
    public void testHandleAllocateProducerIdsSingleRequestHandlerThread(ClusterInstance clusterInstance) {
        clusterInstance.config().serverProperties().put(KafkaConfig$.MODULE$.NumIoThreadsProp(), "1");
        clusterInstance.start();
        this.verifyUniqueIds(clusterInstance);
        clusterInstance.stop();
    }

    @Disabled
    @ClusterTest(clusterType=Type.ZK, brokers=1, autoStart=AutoStart.NO)
    public void testMultipleAllocateProducerIdsRequest(ClusterInstance clusterInstance) {
        clusterInstance.config().serverProperties().put(KafkaConfig$.MODULE$.NumIoThreadsProp(), "2");
        clusterInstance.start();
        this.verifyUniqueIds(clusterInstance);
        clusterInstance.stop();
    }

    private void verifyUniqueIds(ClusterInstance clusterInstance) {
        Seq ids = ((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(clusterInstance.brokerSocketServers().stream().flatMap(broker -> IntStream.range(0, 1001).parallel().mapToObj(x$1 -> BoxesRunTime.boxToLong((long)this.nextProducerId(broker, clusterInstance.clientListener())))).collect(Collectors.toList())).asScala()).toSeq();
        int brokerCount = clusterInstance.brokerIds().size();
        int expectedTotalCount = 1001 * brokerCount;
        Assertions.assertEquals((int)expectedTotalCount, (int)ids.size(), (String)new StringBuilder(21).append("Expected exactly ").append(expectedTotalCount).append(" IDs").toString());
        Assertions.assertEquals((int)expectedTotalCount, (int)((SeqLike)ids.distinct()).size(), (String)"Found duplicate producer IDs");
    }

    private long nextProducerId(SocketServer broker, ListenerName listener) {
        Deadline deadline = new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds().fromNow();
        boolean shouldRetry = true;
        InitProducerIdResponse response = null;
        while (shouldRetry && deadline.hasTimeLeft()) {
            InitProducerIdRequestData data = new InitProducerIdRequestData().setProducerEpoch((short)-1).setProducerId(-1L).setTransactionalId(null).setTransactionTimeoutMs(10);
            InitProducerIdRequest request = (InitProducerIdRequest)new InitProducerIdRequest.Builder(data).build();
            response = (InitProducerIdResponse)IntegrationTestUtils$.MODULE$.connectAndReceive((AbstractRequest)request, broker, listener, ClassTag$.MODULE$.apply(InitProducerIdResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()));
            shouldRetry = response.data().errorCode() == Errors.COORDINATOR_LOAD_IN_PROGRESS.code();
        }
        Assertions.assertTrue((boolean)deadline.hasTimeLeft());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)response.data().errorCode());
        return response.data().producerId();
    }
}

