/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ElectionState;
import org.apache.kafka.raft.ExpirationService;
import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.MockExpirationService;
import org.apache.kafka.raft.MockLog;
import org.apache.kafka.raft.MockMessageQueue;
import org.apache.kafka.raft.MockNetworkChannel;
import org.apache.kafka.raft.MockQuorumStateStore;
import org.apache.kafka.raft.MockableRandom;
import org.apache.kafka.raft.NetworkChannel;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.QuorumStateStore;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.raft.RaftMessage;
import org.apache.kafka.raft.RaftMessageQueue;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.raft.RaftUtil;
import org.apache.kafka.raft.ReplicatedLog;
import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;

public final class RaftClientTestContext {
    public final RecordSerde<String> serde = Builder.access$000();
    final TopicPartition metadataPartition = Builder.access$100();
    final Uuid metadataTopicId = Uuid.METADATA_TOPIC_ID;
    final int electionBackoffMaxMs = 100;
    final int fetchMaxWaitMs = 0;
    final int fetchTimeoutMs = 50000;
    final int retryBackoffMs = 50;
    private int electionTimeoutMs;
    private int requestTimeoutMs;
    private int appendLingerMs;
    private final QuorumStateStore quorumStateStore;
    final Uuid clusterId;
    private final OptionalInt localId;
    public final KafkaRaftClient<String> client;
    final Metrics metrics;
    public final MockLog log;
    final MockNetworkChannel channel;
    final MockMessageQueue messageQueue;
    final MockTime time;
    final MockListener listener;
    final Set<Integer> voters;
    private final List<RaftResponse.Outbound> sentResponses = new ArrayList<RaftResponse.Outbound>();

    private RaftClientTestContext(Uuid clusterId, OptionalInt localId, KafkaRaftClient<String> client, MockLog log, MockNetworkChannel channel, MockMessageQueue messageQueue, MockTime time, QuorumStateStore quorumStateStore, Set<Integer> voters, Metrics metrics, MockListener listener) {
        this.clusterId = clusterId;
        this.localId = localId;
        this.client = client;
        this.log = log;
        this.channel = channel;
        this.messageQueue = messageQueue;
        this.time = time;
        this.quorumStateStore = quorumStateStore;
        this.voters = voters;
        this.metrics = metrics;
        this.listener = listener;
    }

    int electionTimeoutMs() {
        return this.electionTimeoutMs;
    }

    int requestTimeoutMs() {
        return this.requestTimeoutMs;
    }

    int appendLingerMs() {
        return this.appendLingerMs;
    }

    MemoryRecords buildBatch(long baseOffset, int epoch, List<String> records) {
        return RaftClientTestContext.buildBatch(this.time.milliseconds(), baseOffset, epoch, records);
    }

    static MemoryRecords buildBatch(long timestamp, long baseOffset, int epoch, List<String> records) {
        ByteBuffer buffer = ByteBuffer.allocate(512);
        BatchBuilder builder = new BatchBuilder(buffer, Builder.SERDE, CompressionType.NONE, baseOffset, timestamp, false, epoch, 512);
        for (String record : records) {
            builder.appendRecord((Object)record, null);
        }
        return builder.build();
    }

    static RaftClientTestContext initializeAsLeader(int localId, Set<Integer> voters, int epoch) throws Exception {
        if (epoch <= 0) {
            throw new IllegalArgumentException("Cannot become leader in epoch " + epoch);
        }
        RaftClientTestContext context = new Builder(localId, voters).withUnknownLeader(epoch - 1).build();
        context.assertUnknownLeader(epoch - 1);
        context.becomeLeader();
        return context;
    }

    public void becomeLeader() throws Exception {
        int currentEpoch = this.currentEpoch();
        this.time.sleep((long)(this.electionTimeoutMs * 2));
        this.expectAndGrantVotes(currentEpoch + 1);
        this.expectBeginEpoch(currentEpoch + 1);
    }

    public OptionalInt currentLeader() {
        return this.currentLeaderAndEpoch().leaderId();
    }

    public int currentEpoch() {
        return this.currentLeaderAndEpoch().epoch();
    }

    LeaderAndEpoch currentLeaderAndEpoch() {
        ElectionState election = this.quorumStateStore.readElectionState();
        return new LeaderAndEpoch(election.leaderIdOpt, election.epoch);
    }

    void expectAndGrantVotes(int epoch) throws Exception {
        this.pollUntilRequest();
        List<RaftRequest.Outbound> voteRequests = this.collectVoteRequests(epoch, this.log.lastFetchedEpoch(), this.log.endOffset().offset);
        for (RaftRequest.Outbound request : voteRequests) {
            VoteResponseData voteResponse = this.voteResponse(true, Optional.empty(), epoch);
            this.deliverResponse(request.correlationId, request.destinationId(), (ApiMessage)voteResponse);
        }
        this.client.poll();
        this.assertElectedLeader(epoch, this.localIdOrThrow());
    }

    private int localIdOrThrow() {
        return this.localId.orElseThrow(() -> new AssertionError((Object)"Required local id is not defined"));
    }

    private void expectBeginEpoch(int epoch) throws Exception {
        this.pollUntilRequest();
        for (RaftRequest.Outbound request : this.collectBeginEpochRequests(epoch)) {
            BeginQuorumEpochResponseData beginEpochResponse = this.beginEpochResponse(epoch, this.localIdOrThrow());
            this.deliverResponse(request.correlationId, request.destinationId(), (ApiMessage)beginEpochResponse);
        }
        this.client.poll();
    }

    public void pollUntil(TestCondition condition) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            this.client.poll();
            return condition.conditionMet();
        }, (long)5000L, (String)"Condition failed to be satisfied before timeout");
    }

    void pollUntilResponse() throws InterruptedException {
        this.pollUntil(() -> !this.sentResponses.isEmpty());
    }

    void pollUntilRequest() throws InterruptedException {
        this.pollUntil(this.channel::hasSentRequests);
    }

    void assertVotedCandidate(int epoch, int leaderId) throws IOException {
        Assertions.assertEquals((Object)ElectionState.withVotedCandidate((int)epoch, (int)leaderId, this.voters), (Object)this.quorumStateStore.readElectionState());
    }

    public void assertElectedLeader(int epoch, int leaderId) throws IOException {
        Assertions.assertEquals((Object)ElectionState.withElectedLeader((int)epoch, (int)leaderId, this.voters), (Object)this.quorumStateStore.readElectionState());
    }

    void assertUnknownLeader(int epoch) throws IOException {
        Assertions.assertEquals((Object)ElectionState.withUnknownLeader((int)epoch, this.voters), (Object)this.quorumStateStore.readElectionState());
    }

    void assertResignedLeader(int epoch, int leaderId) throws IOException {
        Assertions.assertTrue((boolean)this.client.quorum().isResigned());
        Assertions.assertEquals((Object)ElectionState.withElectedLeader((int)epoch, (int)leaderId, this.voters), (Object)this.quorumStateStore.readElectionState());
    }

    DescribeQuorumResponseData collectDescribeQuorumResponse() {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.DESCRIBE_QUORUM);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftResponse.Outbound raftMessage = sentMessages.get(0);
        Assertions.assertTrue((boolean)(raftMessage.data() instanceof DescribeQuorumResponseData), (String)("Unexpected request type " + raftMessage.data()));
        return (DescribeQuorumResponseData)raftMessage.data();
    }

    void assertSentDescribeQuorumResponse(int leaderId, int leaderEpoch, long highWatermark, List<DescribeQuorumResponseData.ReplicaState> voterStates, List<DescribeQuorumResponseData.ReplicaState> observerStates) {
        DescribeQuorumResponseData response = this.collectDescribeQuorumResponse();
        DescribeQuorumResponseData.PartitionData partitionData = new DescribeQuorumResponseData.PartitionData().setErrorCode(Errors.NONE.code()).setLeaderId(leaderId).setLeaderEpoch(leaderEpoch).setHighWatermark(highWatermark).setCurrentVoters(voterStates).setObservers(observerStates);
        DescribeQuorumResponseData expectedResponse = DescribeQuorumResponse.singletonResponse((TopicPartition)this.metadataPartition, (DescribeQuorumResponseData.PartitionData)partitionData);
        Assertions.assertEquals((Object)expectedResponse, (Object)response);
    }

    int assertSentVoteRequest(int epoch, int lastEpoch, long lastEpochOffset, int numVoteReceivers) {
        List<RaftRequest.Outbound> voteRequests = this.collectVoteRequests(epoch, lastEpoch, lastEpochOffset);
        Assertions.assertEquals((int)numVoteReceivers, (int)voteRequests.size());
        return voteRequests.iterator().next().correlationId();
    }

    void assertSentVoteResponse(Errors error) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.VOTE);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftMessage raftMessage = (RaftMessage)sentMessages.get(0);
        Assertions.assertTrue((boolean)(raftMessage.data() instanceof VoteResponseData));
        VoteResponseData response = (VoteResponseData)raftMessage.data();
        Assertions.assertEquals((Object)error, (Object)Errors.forCode((short)response.errorCode()));
    }

    void assertSentVoteResponse(Errors error, int epoch, OptionalInt leaderId, boolean voteGranted) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.VOTE);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftMessage raftMessage = (RaftMessage)sentMessages.get(0);
        Assertions.assertTrue((boolean)(raftMessage.data() instanceof VoteResponseData));
        VoteResponseData response = (VoteResponseData)raftMessage.data();
        Assertions.assertTrue((boolean)RaftUtil.hasValidTopicPartition((VoteResponseData)response, (TopicPartition)this.metadataPartition));
        VoteResponseData.PartitionData partitionResponse = (VoteResponseData.PartitionData)((VoteResponseData.TopicData)response.topics().get(0)).partitions().get(0);
        Assertions.assertEquals((Object)voteGranted, (Object)partitionResponse.voteGranted());
        Assertions.assertEquals((Object)error, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.leaderEpoch());
        Assertions.assertEquals((int)leaderId.orElse(-1), (int)partitionResponse.leaderId());
    }

    List<RaftRequest.Outbound> collectVoteRequests(int epoch, int lastEpoch, long lastEpochOffset) {
        ArrayList<RaftRequest.Outbound> voteRequests = new ArrayList<RaftRequest.Outbound>();
        for (RaftMessage raftMessage : this.channel.drainSendQueue()) {
            if (!(raftMessage.data() instanceof VoteRequestData)) continue;
            VoteRequestData request = (VoteRequestData)raftMessage.data();
            VoteRequestData.PartitionData partitionRequest = this.unwrap(request);
            Assertions.assertEquals((int)epoch, (int)partitionRequest.candidateEpoch());
            Assertions.assertEquals((int)this.localIdOrThrow(), (int)partitionRequest.candidateId());
            Assertions.assertEquals((int)lastEpoch, (int)partitionRequest.lastOffsetEpoch());
            Assertions.assertEquals((long)lastEpochOffset, (long)partitionRequest.lastOffset());
            voteRequests.add((RaftRequest.Outbound)raftMessage);
        }
        return voteRequests;
    }

    void deliverRequest(ApiMessage request) {
        RaftRequest.Inbound inboundRequest = new RaftRequest.Inbound(this.channel.newCorrelationId(), request, this.time.milliseconds());
        inboundRequest.completion.whenComplete((response, exception) -> {
            if (exception != null) {
                throw new RuntimeException((Throwable)exception);
            }
            this.sentResponses.add((RaftResponse.Outbound)response);
        });
        this.client.handle(inboundRequest);
    }

    void deliverResponse(int correlationId, int sourceId, ApiMessage response) {
        this.channel.mockReceive(new RaftResponse.Inbound(correlationId, response, sourceId));
    }

    int assertSentBeginQuorumEpochRequest(int epoch, int numBeginEpochRequests) {
        List<RaftRequest.Outbound> requests = this.collectBeginEpochRequests(epoch);
        Assertions.assertEquals((int)numBeginEpochRequests, (int)requests.size());
        return requests.get((int)0).correlationId;
    }

    private List<RaftResponse.Outbound> drainSentResponses(ApiKeys apiKey) {
        ArrayList<RaftResponse.Outbound> res = new ArrayList<RaftResponse.Outbound>();
        Iterator<RaftResponse.Outbound> iterator = this.sentResponses.iterator();
        while (iterator.hasNext()) {
            RaftResponse.Outbound response = iterator.next();
            if (response.data.apiKey() != apiKey.id) continue;
            res.add(response);
            iterator.remove();
        }
        return res;
    }

    void assertSentBeginQuorumEpochResponse(Errors responseError) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftMessage raftMessage = (RaftMessage)sentMessages.get(0);
        Assertions.assertTrue((boolean)(raftMessage.data() instanceof BeginQuorumEpochResponseData));
        BeginQuorumEpochResponseData response = (BeginQuorumEpochResponseData)raftMessage.data();
        Assertions.assertEquals((Object)responseError, (Object)Errors.forCode((short)response.errorCode()));
    }

    void assertSentBeginQuorumEpochResponse(Errors partitionError, int epoch, OptionalInt leaderId) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftMessage raftMessage = (RaftMessage)sentMessages.get(0);
        Assertions.assertTrue((boolean)(raftMessage.data() instanceof BeginQuorumEpochResponseData));
        BeginQuorumEpochResponseData response = (BeginQuorumEpochResponseData)raftMessage.data();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)response.errorCode()));
        BeginQuorumEpochResponseData.PartitionData partitionResponse = (BeginQuorumEpochResponseData.PartitionData)((BeginQuorumEpochResponseData.TopicData)response.topics().get(0)).partitions().get(0);
        Assertions.assertEquals((int)epoch, (int)partitionResponse.leaderEpoch());
        Assertions.assertEquals((int)leaderId.orElse(-1), (int)partitionResponse.leaderId());
        Assertions.assertEquals((Object)partitionError, (Object)Errors.forCode((short)partitionResponse.errorCode()));
    }

    int assertSentEndQuorumEpochRequest(int epoch, int destinationId) {
        List<RaftRequest.Outbound> endQuorumRequests = this.collectEndQuorumRequests(epoch, Collections.singleton(destinationId), Optional.empty());
        Assertions.assertEquals((int)1, (int)endQuorumRequests.size());
        return endQuorumRequests.get(0).correlationId();
    }

    void assertSentEndQuorumEpochResponse(Errors responseError) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.END_QUORUM_EPOCH);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftMessage raftMessage = (RaftMessage)sentMessages.get(0);
        Assertions.assertTrue((boolean)(raftMessage.data() instanceof EndQuorumEpochResponseData));
        EndQuorumEpochResponseData response = (EndQuorumEpochResponseData)raftMessage.data();
        Assertions.assertEquals((Object)responseError, (Object)Errors.forCode((short)response.errorCode()));
    }

    void assertSentEndQuorumEpochResponse(Errors partitionError, int epoch, OptionalInt leaderId) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.END_QUORUM_EPOCH);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftMessage raftMessage = (RaftMessage)sentMessages.get(0);
        Assertions.assertTrue((boolean)(raftMessage.data() instanceof EndQuorumEpochResponseData));
        EndQuorumEpochResponseData response = (EndQuorumEpochResponseData)raftMessage.data();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)response.errorCode()));
        EndQuorumEpochResponseData.PartitionData partitionResponse = (EndQuorumEpochResponseData.PartitionData)((EndQuorumEpochResponseData.TopicData)response.topics().get(0)).partitions().get(0);
        Assertions.assertEquals((int)epoch, (int)partitionResponse.leaderEpoch());
        Assertions.assertEquals((int)leaderId.orElse(-1), (int)partitionResponse.leaderId());
        Assertions.assertEquals((Object)partitionError, (Object)Errors.forCode((short)partitionResponse.errorCode()));
    }

    RaftRequest.Outbound assertSentFetchRequest() {
        List<RaftRequest.Outbound> sentRequests = this.channel.drainSentRequests(Optional.of(ApiKeys.FETCH));
        Assertions.assertEquals((int)1, (int)sentRequests.size());
        return sentRequests.get(0);
    }

    int assertSentFetchRequest(int epoch, long fetchOffset, int lastFetchedEpoch) {
        List<RaftRequest.Outbound> sentMessages = this.channel.drainSendQueue();
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftRequest.Outbound raftRequest = sentMessages.get(0);
        this.assertFetchRequestData(raftRequest, epoch, fetchOffset, lastFetchedEpoch);
        return raftRequest.correlationId();
    }

    FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.FETCH);
        Assertions.assertEquals((int)1, (int)sentMessages.size(), (String)("Found unexpected sent messages " + sentMessages));
        RaftResponse.Outbound raftMessage = sentMessages.get(0);
        Assertions.assertEquals((short)ApiKeys.FETCH.id, (short)raftMessage.data.apiKey());
        FetchResponseData response = (FetchResponseData)raftMessage.data();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((int)1, (int)response.responses().size());
        Assertions.assertEquals((Object)this.metadataPartition.topic(), (Object)((FetchResponseData.FetchableTopicResponse)response.responses().get(0)).topic());
        Assertions.assertEquals((int)1, (int)((FetchResponseData.FetchableTopicResponse)response.responses().get(0)).partitions().size());
        return (FetchResponseData.PartitionData)((FetchResponseData.FetchableTopicResponse)response.responses().get(0)).partitions().get(0);
    }

    void assertSentFetchPartitionResponse(Errors topLevelError) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.FETCH);
        Assertions.assertEquals((int)1, (int)sentMessages.size(), (String)("Found unexpected sent messages " + sentMessages));
        RaftResponse.Outbound raftMessage = sentMessages.get(0);
        Assertions.assertEquals((short)ApiKeys.FETCH.id, (short)raftMessage.data.apiKey());
        FetchResponseData response = (FetchResponseData)raftMessage.data();
        Assertions.assertEquals((Object)topLevelError, (Object)Errors.forCode((short)response.errorCode()));
    }

    MemoryRecords assertSentFetchPartitionResponse(Errors error, int epoch, OptionalInt leaderId) {
        FetchResponseData.PartitionData partitionResponse = this.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)error, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)leaderId.orElse(-1), (int)partitionResponse.currentLeader().leaderId());
        Assertions.assertEquals((long)-1L, (long)partitionResponse.divergingEpoch().endOffset());
        Assertions.assertEquals((int)-1, (int)partitionResponse.divergingEpoch().epoch());
        Assertions.assertEquals((long)-1L, (long)partitionResponse.snapshotId().endOffset());
        Assertions.assertEquals((int)-1, (int)partitionResponse.snapshotId().epoch());
        return (MemoryRecords)partitionResponse.records();
    }

    MemoryRecords assertSentFetchPartitionResponse(long highWatermark, int leaderEpoch) {
        FetchResponseData.PartitionData partitionResponse = this.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)leaderEpoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((long)highWatermark, (long)partitionResponse.highWatermark());
        Assertions.assertEquals((long)-1L, (long)partitionResponse.divergingEpoch().endOffset());
        Assertions.assertEquals((int)-1, (int)partitionResponse.divergingEpoch().epoch());
        Assertions.assertEquals((long)-1L, (long)partitionResponse.snapshotId().endOffset());
        Assertions.assertEquals((int)-1, (int)partitionResponse.snapshotId().epoch());
        return (MemoryRecords)partitionResponse.records();
    }

    RaftRequest.Outbound assertSentFetchSnapshotRequest() {
        List<RaftRequest.Outbound> sentRequests = this.channel.drainSentRequests(Optional.of(ApiKeys.FETCH_SNAPSHOT));
        Assertions.assertEquals((int)1, (int)sentRequests.size());
        return sentRequests.get(0);
    }

    void assertSentFetchSnapshotResponse(Errors responseError) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.FETCH_SNAPSHOT);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftMessage message = (RaftMessage)sentMessages.get(0);
        Assertions.assertTrue((boolean)(message.data() instanceof FetchSnapshotResponseData));
        FetchSnapshotResponseData response = (FetchSnapshotResponseData)message.data();
        Assertions.assertEquals((Object)responseError, (Object)Errors.forCode((short)response.errorCode()));
    }

    Optional<FetchSnapshotResponseData.PartitionSnapshot> assertSentFetchSnapshotResponse(TopicPartition topicPartition) {
        List<RaftResponse.Outbound> sentMessages = this.drainSentResponses(ApiKeys.FETCH_SNAPSHOT);
        Assertions.assertEquals((int)1, (int)sentMessages.size());
        RaftMessage message = (RaftMessage)sentMessages.get(0);
        Assertions.assertTrue((boolean)(message.data() instanceof FetchSnapshotResponseData));
        FetchSnapshotResponseData response = (FetchSnapshotResponseData)message.data();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)response.errorCode()));
        return FetchSnapshotResponse.forTopicPartition((FetchSnapshotResponseData)response, (TopicPartition)topicPartition);
    }

    List<RaftRequest.Outbound> collectEndQuorumRequests(int epoch, Set<Integer> destinationIdSet, Optional<List<Integer>> preferredSuccessorsOpt) {
        ArrayList<RaftRequest.Outbound> endQuorumRequests = new ArrayList<RaftRequest.Outbound>();
        HashSet<Integer> collectedDestinationIdSet = new HashSet<Integer>();
        for (RaftMessage raftMessage : this.channel.drainSendQueue()) {
            if (!(raftMessage.data() instanceof EndQuorumEpochRequestData)) continue;
            EndQuorumEpochRequestData request = (EndQuorumEpochRequestData)raftMessage.data();
            EndQuorumEpochRequestData.PartitionData partitionRequest = (EndQuorumEpochRequestData.PartitionData)((EndQuorumEpochRequestData.TopicData)request.topics().get(0)).partitions().get(0);
            Assertions.assertEquals((int)epoch, (int)partitionRequest.leaderEpoch());
            Assertions.assertEquals((int)this.localIdOrThrow(), (int)partitionRequest.leaderId());
            preferredSuccessorsOpt.ifPresent(preferredSuccessors -> Assertions.assertEquals((Object)preferredSuccessors, (Object)partitionRequest.preferredSuccessors()));
            RaftRequest.Outbound outboundRequest = (RaftRequest.Outbound)raftMessage;
            collectedDestinationIdSet.add(outboundRequest.destinationId());
            endQuorumRequests.add(outboundRequest);
        }
        Assertions.assertEquals(destinationIdSet, collectedDestinationIdSet);
        return endQuorumRequests;
    }

    void discoverLeaderAsObserver(int leaderId, int epoch) throws Exception {
        this.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = this.assertSentFetchRequest();
        Assertions.assertTrue((boolean)this.voters.contains(fetchRequest.destinationId()));
        this.assertFetchRequestData(fetchRequest, 0, 0L, 0);
        this.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)this.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, 0L, Errors.NONE));
        this.client.poll();
        this.assertElectedLeader(epoch, leaderId);
    }

    private List<RaftRequest.Outbound> collectBeginEpochRequests(int epoch) {
        ArrayList<RaftRequest.Outbound> requests = new ArrayList<RaftRequest.Outbound>();
        for (RaftRequest.Outbound raftRequest : this.channel.drainSentRequests(Optional.of(ApiKeys.BEGIN_QUORUM_EPOCH))) {
            Assertions.assertTrue((boolean)(raftRequest.data() instanceof BeginQuorumEpochRequestData));
            BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData)raftRequest.data();
            BeginQuorumEpochRequestData.PartitionData partitionRequest = (BeginQuorumEpochRequestData.PartitionData)((BeginQuorumEpochRequestData.TopicData)request.topics().get(0)).partitions().get(0);
            Assertions.assertEquals((int)epoch, (int)partitionRequest.leaderEpoch());
            Assertions.assertEquals((int)this.localIdOrThrow(), (int)partitionRequest.leaderId());
            requests.add(raftRequest);
        }
        return requests;
    }

    private static RaftConfig.AddressSpec mockAddress(int id) {
        return new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", 9990 + id));
    }

    EndQuorumEpochResponseData endEpochResponse(int epoch, OptionalInt leaderId) {
        return EndQuorumEpochResponse.singletonResponse((Errors)Errors.NONE, (TopicPartition)this.metadataPartition, (Errors)Errors.NONE, (int)epoch, (int)leaderId.orElse(-1));
    }

    EndQuorumEpochRequestData endEpochRequest(int epoch, int leaderId, List<Integer> preferredSuccessors) {
        return EndQuorumEpochRequest.singletonRequest((TopicPartition)this.metadataPartition, (int)epoch, (int)leaderId, preferredSuccessors);
    }

    EndQuorumEpochRequestData endEpochRequest(String clusterId, int epoch, int leaderId, List<Integer> preferredSuccessors) {
        return EndQuorumEpochRequest.singletonRequest((TopicPartition)this.metadataPartition, (String)clusterId, (int)epoch, (int)leaderId, preferredSuccessors);
    }

    BeginQuorumEpochRequestData beginEpochRequest(String clusterId, int epoch, int leaderId) {
        return BeginQuorumEpochRequest.singletonRequest((TopicPartition)this.metadataPartition, (String)clusterId, (int)epoch, (int)leaderId);
    }

    BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId) {
        return BeginQuorumEpochRequest.singletonRequest((TopicPartition)this.metadataPartition, (int)epoch, (int)leaderId);
    }

    private BeginQuorumEpochResponseData beginEpochResponse(int epoch, int leaderId) {
        return BeginQuorumEpochResponse.singletonResponse((Errors)Errors.NONE, (TopicPartition)this.metadataPartition, (Errors)Errors.NONE, (int)epoch, (int)leaderId);
    }

    VoteRequestData voteRequest(int epoch, int candidateId, int lastEpoch, long lastEpochOffset) {
        return VoteRequest.singletonRequest((TopicPartition)this.metadataPartition, (String)this.clusterId.toString(), (int)epoch, (int)candidateId, (int)lastEpoch, (long)lastEpochOffset);
    }

    VoteRequestData voteRequest(String clusterId, int epoch, int candidateId, int lastEpoch, long lastEpochOffset) {
        return VoteRequest.singletonRequest((TopicPartition)this.metadataPartition, (String)clusterId, (int)epoch, (int)candidateId, (int)lastEpoch, (long)lastEpochOffset);
    }

    VoteResponseData voteResponse(boolean voteGranted, Optional<Integer> leaderId, int epoch) {
        return VoteResponse.singletonResponse((Errors)Errors.NONE, (TopicPartition)this.metadataPartition, (Errors)Errors.NONE, (int)epoch, (int)leaderId.orElse(-1), (boolean)voteGranted);
    }

    private VoteRequestData.PartitionData unwrap(VoteRequestData voteRequest) {
        Assertions.assertTrue((boolean)RaftUtil.hasValidTopicPartition((VoteRequestData)voteRequest, (TopicPartition)this.metadataPartition));
        return (VoteRequestData.PartitionData)((VoteRequestData.TopicData)voteRequest.topics().get(0)).partitions().get(0);
    }

    static void assertMatchingRecords(String[] expected, Records actual) {
        List recordList = Utils.toList((Iterable)actual.records());
        Assertions.assertEquals((int)expected.length, (int)recordList.size());
        for (int i = 0; i < expected.length; ++i) {
            Record record = (Record)recordList.get(i);
            Assertions.assertEquals((Object)expected[i], (Object)Utils.utf8((ByteBuffer)record.value()), (String)("Record at offset " + record.offset() + " does not match expected"));
        }
    }

    static void verifyLeaderChangeMessage(int leaderId, List<Integer> voters, List<Integer> grantingVoters, ByteBuffer recordKey, ByteBuffer recordValue) {
        Assertions.assertEquals((Object)ControlRecordType.LEADER_CHANGE, (Object)ControlRecordType.parse((ByteBuffer)recordKey));
        LeaderChangeMessage leaderChangeMessage = ControlRecordUtils.deserializeLeaderChangeMessage((ByteBuffer)recordValue);
        Assertions.assertEquals((int)leaderId, (int)leaderChangeMessage.leaderId());
        Assertions.assertEquals(voters.stream().map(voterId -> new LeaderChangeMessage.Voter().setVoterId(voterId.intValue())).collect(Collectors.toList()), (Object)leaderChangeMessage.voters());
        Assertions.assertEquals(grantingVoters.stream().map(voterId -> new LeaderChangeMessage.Voter().setVoterId(voterId.intValue())).collect(Collectors.toSet()), new HashSet(leaderChangeMessage.grantingVoters()));
    }

    void assertFetchRequestData(RaftRequest.Outbound message, int epoch, long fetchOffset, int lastFetchedEpoch) {
        Assertions.assertTrue((boolean)(message.data() instanceof FetchRequestData), (String)("unexpected request type " + message.data()));
        FetchRequestData request = (FetchRequestData)message.data();
        Assertions.assertEquals((int)0x800000, (int)request.maxBytes());
        Assertions.assertEquals((int)0, (int)request.maxWaitMs());
        Assertions.assertEquals((int)1, (int)request.topics().size());
        Assertions.assertEquals((Object)this.metadataPartition.topic(), (Object)((FetchRequestData.FetchTopic)request.topics().get(0)).topic());
        Assertions.assertEquals((int)1, (int)((FetchRequestData.FetchTopic)request.topics().get(0)).partitions().size());
        FetchRequestData.FetchPartition fetchPartition = (FetchRequestData.FetchPartition)((FetchRequestData.FetchTopic)request.topics().get(0)).partitions().get(0);
        Assertions.assertEquals((int)epoch, (int)fetchPartition.currentLeaderEpoch());
        Assertions.assertEquals((long)fetchOffset, (long)fetchPartition.fetchOffset());
        Assertions.assertEquals((int)lastFetchedEpoch, (int)fetchPartition.lastFetchedEpoch());
        Assertions.assertEquals((int)this.localId.orElse(-1), (int)request.replicaState().replicaId());
        if (this.localId.isPresent() && this.voters.contains(this.localId.getAsInt())) {
            Assertions.assertEquals((long)this.log.firstUnflushedOffset(), (long)fetchOffset, (String)String.format("expected voters have the fetch offset (%s) be the same as the unflushed offset (%s)", this.log.firstUnflushedOffset(), fetchOffset));
        } else {
            Assertions.assertFalse((boolean)this.log.flushedSinceLastChecked(), (String)"KRaft client should not explicitly flush when it is an observer");
        }
    }

    FetchRequestData fetchRequest(int epoch, int replicaId, long fetchOffset, int lastFetchedEpoch, int maxWaitTimeMs) {
        return this.fetchRequest(epoch, this.clusterId.toString(), replicaId, fetchOffset, lastFetchedEpoch, maxWaitTimeMs);
    }

    FetchRequestData fetchRequest(int epoch, String clusterId, int replicaId, long fetchOffset, int lastFetchedEpoch, int maxWaitTimeMs) {
        FetchRequestData request = RaftUtil.singletonFetchRequest((TopicPartition)this.metadataPartition, (Uuid)this.metadataTopicId, fetchPartition -> fetchPartition.setCurrentLeaderEpoch(epoch).setLastFetchedEpoch(lastFetchedEpoch).setFetchOffset(fetchOffset));
        return request.setMaxWaitMs(maxWaitTimeMs).setClusterId(clusterId).setReplicaState(new FetchRequestData.ReplicaState().setReplicaId(replicaId));
    }

    FetchResponseData fetchResponse(int epoch, int leaderId, Records records, long highWatermark, Errors error) {
        return RaftUtil.singletonFetchResponse((TopicPartition)this.metadataPartition, (Uuid)this.metadataTopicId, (Errors)Errors.NONE, partitionData -> {
            partitionData.setRecords((BaseRecords)records).setErrorCode(error.code()).setHighWatermark(highWatermark);
            partitionData.currentLeader().setLeaderEpoch(epoch).setLeaderId(leaderId);
        });
    }

    FetchResponseData divergingFetchResponse(int epoch, int leaderId, long divergingEpochEndOffset, int divergingEpoch, long highWatermark) {
        return RaftUtil.singletonFetchResponse((TopicPartition)this.metadataPartition, (Uuid)this.metadataTopicId, (Errors)Errors.NONE, partitionData -> {
            partitionData.setHighWatermark(highWatermark);
            partitionData.currentLeader().setLeaderEpoch(epoch).setLeaderId(leaderId);
            partitionData.divergingEpoch().setEpoch(divergingEpoch).setEndOffset(divergingEpochEndOffset);
        });
    }

    public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws InterruptedException {
        Assertions.assertEquals((Object)this.localId, (Object)this.currentLeader());
        long localLogEndOffset = this.log.endOffset().offset;
        Set followers = this.voters.stream().filter(voter -> voter.intValue() != this.localId.getAsInt()).collect(Collectors.toSet());
        Iterator iterator = followers.iterator();
        while (iterator.hasNext()) {
            int follower = (Integer)iterator.next();
            this.deliverRequest((ApiMessage)this.fetchRequest(this.currentEpoch(), follower, localLogEndOffset, this.currentEpoch(), 0));
            this.pollUntilResponse();
            this.assertSentFetchPartitionResponse(Errors.NONE, this.currentEpoch(), this.localId);
        }
        this.pollUntil(() -> OptionalLong.of(localLogEndOffset).equals(this.client.highWatermark()));
    }

    static class MockListener
    implements RaftClient.Listener<String> {
        private final List<Batch<String>> commits = new ArrayList<Batch<String>>();
        private final List<BatchReader<String>> savedBatches = new ArrayList<BatchReader<String>>();
        private final Map<Integer, Long> claimedEpochStartOffsets = new HashMap<Integer, Long>();
        private LeaderAndEpoch currentLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);
        private final OptionalInt localId;
        private Optional<SnapshotReader<String>> snapshot = Optional.empty();
        private boolean readCommit = true;

        MockListener(OptionalInt localId) {
            this.localId = localId;
        }

        int numCommittedBatches() {
            return this.commits.size();
        }

        Long claimedEpochStartOffset(int epoch) {
            return this.claimedEpochStartOffsets.get(epoch);
        }

        LeaderAndEpoch currentLeaderAndEpoch() {
            return this.currentLeaderAndEpoch;
        }

        List<Batch<String>> committedBatches() {
            return this.commits;
        }

        Batch<String> lastCommit() {
            if (this.commits.isEmpty()) {
                return null;
            }
            return this.commits.get(this.commits.size() - 1);
        }

        OptionalLong lastCommitOffset() {
            if (this.commits.isEmpty()) {
                return OptionalLong.empty();
            }
            return OptionalLong.of(this.commits.get(this.commits.size() - 1).lastOffset());
        }

        OptionalInt currentClaimedEpoch() {
            if (this.localId.isPresent() && this.currentLeaderAndEpoch.isLeader(this.localId.getAsInt())) {
                return OptionalInt.of(this.currentLeaderAndEpoch.epoch());
            }
            return OptionalInt.empty();
        }

        List<String> commitWithLastOffset(long lastOffset) {
            return this.commits.stream().filter(batch -> batch.lastOffset() == lastOffset).findFirst().map(batch -> batch.records()).orElse(null);
        }

        Optional<SnapshotReader<String>> drainHandledSnapshot() {
            Optional<SnapshotReader<String>> temp = this.snapshot;
            this.snapshot = Optional.empty();
            return temp;
        }

        void updateReadCommit(boolean readCommit) {
            this.readCommit = readCommit;
            if (readCommit) {
                for (BatchReader<String> batch : this.savedBatches) {
                    this.readBatch(batch);
                }
                this.savedBatches.clear();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void readBatch(BatchReader<String> reader) {
            try {
                while (reader.hasNext()) {
                    long nextOffset = this.lastCommitOffset().isPresent() ? this.lastCommitOffset().getAsLong() + 1L : 0L;
                    Batch batch = (Batch)reader.next();
                    Assertions.assertTrue((batch.baseOffset() >= nextOffset ? 1 : 0) != 0, (String)("Received non-monotonic commit " + batch + ". We expected an offset at least as large as " + nextOffset));
                    this.commits.add((Batch<String>)batch);
                }
            }
            finally {
                reader.close();
            }
        }

        public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            this.currentLeaderAndEpoch = leaderAndEpoch;
            this.currentClaimedEpoch().ifPresent(claimedEpoch -> {
                long claimedEpochStartOffset = this.lastCommitOffset().isPresent() ? this.lastCommitOffset().getAsLong() : 0L;
                this.claimedEpochStartOffsets.put(leaderAndEpoch.epoch(), claimedEpochStartOffset);
            });
        }

        public void handleCommit(BatchReader<String> reader) {
            if (this.readCommit) {
                this.readBatch(reader);
            } else {
                this.savedBatches.add(reader);
            }
        }

        public void handleLoadSnapshot(SnapshotReader<String> reader) {
            this.snapshot.ifPresent(snapshot -> Assertions.assertDoesNotThrow(() -> ((SnapshotReader)snapshot).close()));
            this.commits.clear();
            this.savedBatches.clear();
            this.snapshot = Optional.of(reader);
        }
    }

    public static final class Builder {
        static final int DEFAULT_ELECTION_TIMEOUT_MS = 10000;
        private static final RecordSerde<String> SERDE = new StringSerde();
        private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
        private static final int ELECTION_BACKOFF_MAX_MS = 100;
        private static final int FETCH_MAX_WAIT_MS = 0;
        private static final int FETCH_TIMEOUT_MS = 50000;
        private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
        private static final int RETRY_BACKOFF_MS = 50;
        private static final int DEFAULT_APPEND_LINGER_MS = 0;
        private final MockMessageQueue messageQueue = new MockMessageQueue();
        private final MockTime time = new MockTime();
        private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
        private final MockableRandom random = new MockableRandom(1L);
        private final LogContext logContext = new LogContext();
        private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, this.logContext);
        private final Set<Integer> voters;
        private final OptionalInt localId;
        private Uuid clusterId = Uuid.randomUuid();
        private int requestTimeoutMs = 5000;
        private int electionTimeoutMs = 10000;
        private int appendLingerMs = 0;
        private MemoryPool memoryPool = MemoryPool.NONE;

        public Builder(int localId, Set<Integer> voters) {
            this(OptionalInt.of(localId), voters);
        }

        public Builder(OptionalInt localId, Set<Integer> voters) {
            this.voters = voters;
            this.localId = localId;
        }

        Builder withElectedLeader(int epoch, int leaderId) throws IOException {
            this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader((int)epoch, (int)leaderId, this.voters));
            return this;
        }

        Builder withUnknownLeader(int epoch) throws IOException {
            this.quorumStateStore.writeElectionState(ElectionState.withUnknownLeader((int)epoch, this.voters));
            return this;
        }

        Builder withVotedCandidate(int epoch, int votedId) throws IOException {
            this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate((int)epoch, (int)votedId, this.voters));
            return this;
        }

        Builder updateRandom(Consumer<MockableRandom> consumer) {
            consumer.accept(this.random);
            return this;
        }

        Builder withMemoryPool(MemoryPool pool) {
            this.memoryPool = pool;
            return this;
        }

        Builder withAppendLingerMs(int appendLingerMs) {
            this.appendLingerMs = appendLingerMs;
            return this;
        }

        public Builder appendToLog(int epoch, List<String> records) {
            MemoryRecords batch = RaftClientTestContext.buildBatch(this.time.milliseconds(), this.log.endOffset().offset, epoch, records);
            this.log.appendAsLeader((Records)batch, epoch);
            this.log.flush(false);
            this.log.flushedSinceLastChecked();
            return this;
        }

        Builder withEmptySnapshot(OffsetAndEpoch snapshotId) throws IOException {
            try (RawSnapshotWriter snapshot = this.log.storeSnapshot(snapshotId).get();){
                snapshot.freeze();
            }
            return this;
        }

        Builder deleteBeforeSnapshot(OffsetAndEpoch snapshotId) throws IOException {
            if (snapshotId.offset() > this.log.highWatermark().offset) {
                this.log.updateHighWatermark(new LogOffsetMetadata(snapshotId.offset()));
            }
            this.log.deleteBeforeSnapshot(snapshotId);
            return this;
        }

        Builder withElectionTimeoutMs(int electionTimeoutMs) {
            this.electionTimeoutMs = electionTimeoutMs;
            return this;
        }

        Builder withRequestTimeoutMs(int requestTimeoutMs) {
            this.requestTimeoutMs = requestTimeoutMs;
            return this;
        }

        Builder withClusterId(Uuid clusterId) {
            this.clusterId = clusterId;
            return this;
        }

        public RaftClientTestContext build() throws IOException {
            Metrics metrics = new Metrics((Time)this.time);
            MockNetworkChannel channel = new MockNetworkChannel(this.voters);
            MockListener listener = new MockListener(this.localId);
            Map<Integer, RaftConfig.AddressSpec> voterAddressMap = this.voters.stream().collect(Collectors.toMap(id -> id, x$0 -> RaftClientTestContext.mockAddress(x$0)));
            RaftConfig raftConfig = new RaftConfig(voterAddressMap, this.requestTimeoutMs, 50, this.electionTimeoutMs, 100, 50000, this.appendLingerMs);
            KafkaRaftClient client = new KafkaRaftClient(SERDE, (NetworkChannel)channel, (RaftMessageQueue)this.messageQueue, (ReplicatedLog)this.log, this.quorumStateStore, this.memoryPool, (Time)this.time, metrics, (ExpirationService)new MockExpirationService(this.time), 0, this.clusterId.toString(), this.localId, this.logContext, (Random)this.random, raftConfig);
            client.register((RaftClient.Listener)listener);
            client.initialize();
            RaftClientTestContext context = new RaftClientTestContext(this.clusterId, this.localId, client, this.log, channel, this.messageQueue, this.time, this.quorumStateStore, this.voters, metrics, listener);
            context.electionTimeoutMs = this.electionTimeoutMs;
            context.requestTimeoutMs = this.requestTimeoutMs;
            context.appendLingerMs = this.appendLingerMs;
            return context;
        }

        static /* synthetic */ TopicPartition access$100() {
            return METADATA_PARTITION;
        }
    }
}

