/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

public class KafkaRaftClientTest {
    @Test
    public void testInitializeSingleMemberQuorum() throws IOException {
        int localId = 0;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build();
        context.assertElectedLeader(1, localId);
        Assertions.assertEquals((long)context.log.endOffset().offset, (long)context.client.logEndOffset());
    }

    @Test
    public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception {
        int localId = 0;
        int initialEpoch = 2;
        Set<Integer> voters = Collections.singleton(localId);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).withElectedLeader(initialEpoch, localId).build();
        context.pollUntil(() -> context.log.endOffset().offset == 1L);
        Assertions.assertEquals((long)1L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((int)(initialEpoch + 1), (int)context.log.lastFetchedEpoch());
        Assertions.assertEquals((Object)new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), (Object)context.currentLeaderAndEpoch());
        context.assertElectedLeader(initialEpoch + 1, localId);
    }

    @Test
    public void testRejectVotesFromSameEpochAfterResigningLeadership() throws Exception {
        int localId = 0;
        int remoteId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, remoteId});
        int epoch = 2;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(10000, 0)).withElectedLeader(epoch, localId).build();
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.assertElectedLeader(epoch, localId);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch, remoteId, context.log.lastFetchedEpoch(), context.log.endOffset().offset));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false);
    }

    @Test
    public void testRejectVotesFromSameEpochAfterResigningCandidacy() throws Exception {
        int localId = 0;
        int remoteId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, remoteId});
        int epoch = 2;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(10000, 0)).withVotedCandidate(epoch, localId).build();
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.assertVotedCandidate(epoch, localId);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch, remoteId, context.log.lastFetchedEpoch(), context.log.endOffset().offset));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), false);
    }

    @Test
    public void testGrantVotesFromHigherEpochAfterResigningLeadership() throws Exception {
        int localId = 0;
        int remoteId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, remoteId});
        int epoch = 2;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(10000, 0)).withElectedLeader(epoch, localId).build();
        Assertions.assertTrue((boolean)context.client.quorum().isResigned());
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.assertElectedLeader(epoch, localId);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, remoteId, context.log.lastFetchedEpoch(), context.log.endOffset().offset));
        context.client.poll();
        Assertions.assertTrue((boolean)context.client.quorum().isVoted());
        context.assertVotedCandidate(epoch + 1, remoteId);
        context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true);
    }

    @Test
    public void testGrantVotesFromHigherEpochAfterResigningCandidacy() throws Exception {
        int localId = 0;
        int remoteId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, remoteId});
        int epoch = 2;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(10000, 0)).withVotedCandidate(epoch, localId).build();
        Assertions.assertTrue((boolean)context.client.quorum().isCandidate());
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.assertVotedCandidate(epoch, localId);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, remoteId, context.log.lastFetchedEpoch(), context.log.endOffset().offset));
        context.client.poll();
        Assertions.assertTrue((boolean)context.client.quorum().isVoted());
        context.assertVotedCandidate(epoch + 1, remoteId);
        context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true);
    }

    @Test
    public void testGrantVotesWhenShuttingDown() throws Exception {
        int localId = 0;
        int remoteId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, remoteId});
        int epoch = 2;
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.client.shutdown(1000);
        Assertions.assertTrue((boolean)context.client.isShuttingDown());
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, remoteId, context.log.lastFetchedEpoch(), context.log.endOffset().offset));
        context.client.poll();
        Assertions.assertTrue((boolean)context.client.quorum().isVoted());
        context.assertVotedCandidate(epoch + 1, remoteId);
        context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true);
    }

    @Test
    public void testInitializeAsResignedAndBecomeCandidate() throws Exception {
        int localId = 0;
        int remoteId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, remoteId});
        int epoch = 2;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(10000, 0)).withElectedLeader(epoch, localId).build();
        Assertions.assertTrue((boolean)context.client.quorum().isResigned());
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.assertElectedLeader(epoch, localId);
        context.time.sleep((long)context.electionTimeoutMs());
        context.client.poll();
        Assertions.assertTrue((boolean)context.client.quorum().isCandidate());
        context.assertVotedCandidate(epoch + 1, localId);
    }

    @Test
    public void testInitializeAsResignedLeaderFromStateStore() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, 1});
        int epoch = 2;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(10000, 0)).withElectedLeader(epoch, localId).build();
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.assertElectedLeader(epoch, localId);
        context.client.poll();
        Assertions.assertThrows(NotLeaderException.class, () -> context.client.scheduleAppend(epoch, Arrays.asList("a", "b")));
        context.pollUntilRequest();
        int correlationId = context.assertSentEndQuorumEpochRequest(epoch, 1);
        context.deliverResponse(correlationId, 1, (ApiMessage)context.endEpochResponse(epoch, OptionalInt.of(localId)));
        context.client.poll();
        context.time.sleep((long)context.electionTimeoutMs());
        context.pollUntilRequest();
        context.assertVotedCandidate(epoch + 1, localId);
        context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
    }

    @Test
    public void testAppendFailedWithNotLeaderException() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, 1});
        int epoch = 2;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(epoch).build();
        Assertions.assertThrows(NotLeaderException.class, () -> context.client.scheduleAppend(epoch, Arrays.asList("a", "b")));
    }

    @Test
    public void testAppendFailedWithBufferAllocationException() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        MemoryPool memoryPool = (MemoryPool)Mockito.mock(MemoryPool.class);
        ByteBuffer leaderBuffer = ByteBuffer.allocate(256);
        Mockito.when((Object)memoryPool.tryAllocate(0x800000)).thenReturn(null);
        Mockito.when((Object)memoryPool.tryAllocate(256)).thenReturn((Object)leaderBuffer);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withMemoryPool(memoryPool).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int epoch = context.currentEpoch();
        Assertions.assertThrows(BufferAllocationException.class, () -> context.client.scheduleAppend(epoch, Collections.singletonList("a")));
    }

    @Test
    public void testAppendFailedWithFencedEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int epoch = context.currentEpoch();
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.scheduleAppend(epoch + 1, Collections.singletonList("a")));
        Assertions.assertThrows(NotLeaderException.class, () -> context.client.scheduleAppend(epoch - 1, Collections.singletonList("a")));
    }

    @Test
    public void testAppendFailedWithRecordBatchTooLargeException() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int epoch = context.currentEpoch();
        int size = 0x100001;
        ArrayList<String> batchToLarge = new ArrayList<String>(size + 1);
        for (int i = 0; i < size; ++i) {
            batchToLarge.add("a");
        }
        Assertions.assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge));
    }

    @Test
    public void testEndQuorumEpochRetriesWhileResigned() throws Exception {
        int localId = 0;
        int voter1 = 1;
        int voter2 = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, voter1, voter2});
        int epoch = 19;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectionTimeoutMs(10000).withRequestTimeoutMs(5000).withElectedLeader(epoch, localId).build();
        context.pollUntilRequest();
        List<RaftRequest.Outbound> requests = context.collectEndQuorumRequests(epoch, Utils.mkSet((Object[])new Integer[]{voter1, voter2}), Optional.empty());
        Assertions.assertEquals((int)2, (int)requests.size());
        RaftRequest.Outbound endEpochOutbound = requests.get(0);
        context.deliverResponse(endEpochOutbound.correlationId, endEpochOutbound.destinationId(), (ApiMessage)context.endEpochResponse(epoch, OptionalInt.of(localId)));
        context.client.poll();
        Assertions.assertEquals(Collections.emptyList(), context.channel.drainSendQueue());
        int nonRespondedId = requests.get(1).destinationId();
        context.time.sleep(6000L);
        context.pollUntilRequest();
        List<RaftRequest.Outbound> retries = context.collectEndQuorumRequests(epoch, Utils.mkSet((Object[])new Integer[]{nonRespondedId}), Optional.empty());
        Assertions.assertEquals((int)1, (int)retries.size());
    }

    @Test
    public void testResignWillCompleteFetchPurgatory() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int epoch = context.currentEpoch();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, context.log.endOffset().offset, epoch, 1000));
        context.client.poll();
        context.log.appendAsLeader((Records)context.buildBatch(context.log.endOffset().offset, epoch, Collections.singletonList("raft")), epoch);
        context.client.shutdown(1000);
        context.client.poll();
        context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, epoch, OptionalInt.of(localId));
        context.assertResignedLeader(epoch, localId);
        context.time.sleep(1000L);
        context.client.poll();
        Assertions.assertFalse((boolean)context.client.isRunning());
        Assertions.assertFalse((boolean)context.client.isShuttingDown());
    }

    @Test
    public void testResignInOlderEpochIgnored() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int currentEpoch = context.currentEpoch();
        context.client.resign(currentEpoch - 1);
        context.client.poll();
        context.time.sleep((long)(context.electionTimeoutMs() * 2));
        context.client.poll();
        context.assertElectedLeader(currentEpoch, localId);
    }

    @Test
    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exception {
        int localId = 0;
        int remoteId1 = 1;
        int remoteId2 = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, remoteId1, remoteId2});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int resignedEpoch = context.currentEpoch();
        context.client.resign(resignedEpoch);
        context.pollUntil(() -> ((QuorumState)context.client.quorum()).isResigned());
        context.deliverRequest((ApiMessage)context.beginEpochRequest(resignedEpoch + 1, remoteId1));
        context.pollUntilResponse();
        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
        context.assertElectedLeader(resignedEpoch + 1, remoteId1);
        Assertions.assertEquals((Object)new LeaderAndEpoch(OptionalInt.of(remoteId1), resignedEpoch + 1), (Object)context.listener.currentLeaderAndEpoch());
    }

    @Test
    public void testElectionTimeoutAfterUserInitiatedResign() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int resignedEpoch = context.currentEpoch();
        context.client.resign(resignedEpoch);
        context.pollUntil(() -> ((QuorumState)context.client.quorum()).isResigned());
        context.pollUntilRequest();
        int correlationId = context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId);
        EndQuorumEpochResponseData response = EndQuorumEpochResponse.singletonResponse((Errors)Errors.NONE, (TopicPartition)context.metadataPartition, (Errors)Errors.NONE, (int)resignedEpoch, (int)localId);
        context.deliverResponse(correlationId, otherNodeId, (ApiMessage)response);
        context.client.poll();
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(50L);
        context.client.poll();
        Assertions.assertFalse((boolean)context.channel.hasSentRequests());
        context.deliverRequest((ApiMessage)context.fetchRequest(1, -1, 0L, 0, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, resignedEpoch, OptionalInt.of(localId));
        context.time.sleep((long)(2 * context.electionTimeoutMs()));
        context.pollUntil(() -> ((QuorumState)context.client.quorum()).isCandidate());
        Assertions.assertEquals((int)(resignedEpoch + 1), (int)context.currentEpoch());
        Assertions.assertEquals((Object)new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1), (Object)context.listener.currentLeaderAndEpoch());
    }

    @Test
    public void testCannotResignWithLargerEpochThanCurrentEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.resign(context.currentEpoch() + 1));
    }

    @Test
    public void testCannotResignIfNotLeader() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int leaderEpoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(leaderEpoch, otherNodeId).build();
        Assertions.assertEquals((Object)OptionalInt.of(otherNodeId), (Object)context.currentLeader());
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.resign(leaderEpoch));
    }

    @Test
    public void testCannotResignIfObserver() throws Exception {
        int leaderId = 1;
        int otherNodeId = 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{leaderId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(OptionalInt.empty(), (Set<Integer>)voters).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        context.client.poll();
        context.assertElectedLeader(epoch, leaderId);
        Assertions.assertThrows(IllegalStateException.class, () -> context.client.resign(epoch));
    }

    @Test
    public void testInitializeAsCandidateFromStateStore() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, 1, 2});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withVotedCandidate(2, localId).build();
        context.assertVotedCandidate(2, localId);
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.pollUntilRequest();
        List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(2, 0, 0L);
        Assertions.assertEquals((int)2, (int)voteRequests.size());
    }

    @Test
    public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
        int localId = 0;
        boolean otherNodeId = true;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, 1});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.assertUnknownLeader(0);
        context.time.sleep((long)(2 * context.electionTimeoutMs()));
        context.pollUntilRequest();
        context.assertVotedCandidate(1, localId);
        int correlationId = context.assertSentVoteRequest(1, 0, 0L, 1);
        context.deliverResponse(correlationId, 1, (ApiMessage)context.voteResponse(true, Optional.empty(), 1));
        context.pollUntil(() -> context.log.endOffset().offset == 1L);
        context.assertElectedLeader(1, localId);
        long electionTimestamp = context.time.milliseconds();
        Assertions.assertEquals((long)1L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((long)1L, (long)context.log.firstUnflushedOffset());
        context.client.poll();
        context.assertSentBeginQuorumEpochRequest(1, 1);
        Records records = context.log.read((long)0L, (Isolation)Isolation.UNCOMMITTED).records;
        RecordBatch batch = (RecordBatch)records.batches().iterator().next();
        Assertions.assertTrue((boolean)batch.isControlBatch());
        Record record = (Record)batch.iterator().next();
        Assertions.assertEquals((long)electionTimestamp, (long)record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(localId, Arrays.asList(localId, 1), Arrays.asList(1, localId), record.key(), record.value());
    }

    @Test
    public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree() throws Exception {
        int localId = 0;
        boolean firstNodeId = true;
        int secondNodeId = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, 1, 2});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.assertUnknownLeader(0);
        context.time.sleep((long)(2 * context.electionTimeoutMs()));
        context.pollUntilRequest();
        context.assertVotedCandidate(1, localId);
        int correlationId = context.assertSentVoteRequest(1, 0, 0L, 2);
        context.deliverResponse(correlationId, 1, (ApiMessage)context.voteResponse(true, Optional.empty(), 1));
        context.pollUntil(() -> context.log.endOffset().offset == 1L);
        context.assertElectedLeader(1, localId);
        long electionTimestamp = context.time.milliseconds();
        Assertions.assertEquals((long)1L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((long)1L, (long)context.log.firstUnflushedOffset());
        context.client.poll();
        context.assertSentBeginQuorumEpochRequest(1, 2);
        Records records = context.log.read((long)0L, (Isolation)Isolation.UNCOMMITTED).records;
        RecordBatch batch = (RecordBatch)records.batches().iterator().next();
        Assertions.assertTrue((boolean)batch.isControlBatch());
        Record record = (Record)batch.iterator().next();
        Assertions.assertEquals((long)electionTimestamp, (long)record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(localId, Arrays.asList(localId, 1, 2), Arrays.asList(1, localId), record.key(), record.value());
    }

    @Test
    public void testHandleBeginQuorumRequest() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int votedCandidateEpoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withVotedCandidate(votedCandidateEpoch, otherNodeId).build();
        context.deliverRequest((ApiMessage)context.beginEpochRequest(votedCandidateEpoch, otherNodeId));
        context.pollUntilResponse();
        context.assertElectedLeader(votedCandidateEpoch, otherNodeId);
        context.assertSentBeginQuorumEpochResponse(Errors.NONE, votedCandidateEpoch, OptionalInt.of(otherNodeId));
    }

    @Test
    public void testHandleBeginQuorumResponse() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int leaderEpoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(leaderEpoch, localId).build();
        context.deliverRequest((ApiMessage)context.beginEpochRequest(leaderEpoch + 1, otherNodeId));
        context.pollUntilResponse();
        context.assertElectedLeader(leaderEpoch + 1, otherNodeId);
    }

    @Test
    public void testEndQuorumIgnoredAsCandidateIfOlderEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        int jitterMs = 85;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(jitterMs)).withUnknownLeader(epoch - 1).build();
        context.time.sleep((long)(context.electionTimeoutMs() + jitterMs));
        context.client.poll();
        context.assertVotedCandidate(epoch, localId);
        context.deliverRequest((ApiMessage)context.endEpochRequest(epoch - 2, otherNodeId, Collections.singletonList(localId)));
        context.client.poll();
        context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.empty());
        context.time.sleep((long)(context.electionTimeoutMs() + jitterMs - 1));
        context.client.poll();
        context.assertVotedCandidate(epoch, localId);
        context.time.sleep(1L);
        context.client.poll();
        context.assertVotedCandidate(epoch, localId);
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(100L);
        context.client.poll();
        context.assertVotedCandidate(epoch + 1, localId);
    }

    @Test
    public void testEndQuorumIgnoredAsLeaderIfOlderEpoch() throws Exception {
        int localId = 0;
        int voter2 = localId + 1;
        int voter3 = localId + 2;
        int epoch = 7;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, voter2, voter3});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.endEpochRequest(epoch - 2, voter2, Arrays.asList(localId, voter3)));
        context.pollUntilResponse();
        context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.of(localId));
        context.time.sleep((long)(context.fetchTimeoutMs - 1));
        context.client.poll();
        context.assertElectedLeader(epoch, localId);
    }

    @Test
    public void testEndQuorumStartsNewElectionImmediatelyIfFollowerUnattached() throws Exception {
        int localId = 0;
        int voter2 = localId + 1;
        int voter3 = localId + 2;
        int epoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, voter2, voter3});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(epoch).build();
        context.deliverRequest((ApiMessage)context.endEpochRequest(epoch, voter2, Arrays.asList(localId, voter3)));
        context.pollUntilResponse();
        context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(voter2));
        context.client.poll();
        context.assertVotedCandidate(epoch + 1, localId);
    }

    @Test
    public void testAccumulatorClearedAfterBecomingFollower() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int lingerMs = 50;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        MemoryPool memoryPool = (MemoryPool)Mockito.mock(MemoryPool.class);
        ByteBuffer buffer = ByteBuffer.allocate(0x800000);
        ByteBuffer leaderBuffer = ByteBuffer.allocate(256);
        Mockito.when((Object)memoryPool.tryAllocate(0x800000)).thenReturn((Object)buffer);
        Mockito.when((Object)memoryPool.tryAllocate(256)).thenReturn((Object)leaderBuffer);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withAppendLingerMs(lingerMs).withMemoryPool(memoryPool).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int epoch = context.currentEpoch();
        Assertions.assertEquals((long)1L, (long)context.client.scheduleAppend(epoch, Collections.singletonList("a")));
        context.deliverRequest((ApiMessage)context.beginEpochRequest(epoch + 1, otherNodeId));
        context.pollUntilResponse();
        context.assertElectedLeader(epoch + 1, otherNodeId);
        ((MemoryPool)Mockito.verify((Object)memoryPool)).release(buffer);
    }

    @Test
    public void testAccumulatorClearedAfterBecomingVoted() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int lingerMs = 50;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        MemoryPool memoryPool = (MemoryPool)Mockito.mock(MemoryPool.class);
        ByteBuffer buffer = ByteBuffer.allocate(0x800000);
        ByteBuffer leaderBuffer = ByteBuffer.allocate(256);
        Mockito.when((Object)memoryPool.tryAllocate(0x800000)).thenReturn((Object)buffer);
        Mockito.when((Object)memoryPool.tryAllocate(256)).thenReturn((Object)leaderBuffer);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withAppendLingerMs(lingerMs).withMemoryPool(memoryPool).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int epoch = context.currentEpoch();
        Assertions.assertEquals((long)1L, (long)context.client.scheduleAppend(epoch, Collections.singletonList("a")));
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, otherNodeId, epoch, context.log.endOffset().offset));
        context.pollUntilResponse();
        context.assertVotedCandidate(epoch + 1, otherNodeId);
        ((MemoryPool)Mockito.verify((Object)memoryPool)).release(buffer);
    }

    @Test
    public void testAccumulatorClearedAfterBecomingUnattached() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int lingerMs = 50;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        MemoryPool memoryPool = (MemoryPool)Mockito.mock(MemoryPool.class);
        ByteBuffer buffer = ByteBuffer.allocate(0x800000);
        ByteBuffer leaderBuffer = ByteBuffer.allocate(256);
        Mockito.when((Object)memoryPool.tryAllocate(0x800000)).thenReturn((Object)buffer);
        Mockito.when((Object)memoryPool.tryAllocate(256)).thenReturn((Object)leaderBuffer);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withAppendLingerMs(lingerMs).withMemoryPool(memoryPool).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int epoch = context.currentEpoch();
        Assertions.assertEquals((long)1L, (long)context.client.scheduleAppend(epoch, Collections.singletonList("a")));
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, otherNodeId, epoch, 0L));
        context.pollUntilResponse();
        context.assertUnknownLeader(epoch + 1);
        ((MemoryPool)Mockito.verify((Object)memoryPool)).release(buffer);
    }

    @Test
    public void testChannelWokenUpIfLingerTimeoutReachedWithoutAppend() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int lingerMs = 50;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withAppendLingerMs(lingerMs).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        Assertions.assertEquals((long)1L, (long)context.log.endOffset().offset);
        int epoch = context.currentEpoch();
        Assertions.assertEquals((long)1L, (long)context.client.scheduleAppend(epoch, Collections.singletonList("a")));
        Assertions.assertTrue((boolean)context.messageQueue.wakeupRequested());
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(lingerMs), (Object)context.messageQueue.lastPollTimeoutMs());
        context.time.sleep(20L);
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(30L), (Object)context.messageQueue.lastPollTimeoutMs());
        context.time.sleep(30L);
        context.client.poll();
        Assertions.assertEquals((long)2L, (long)context.log.endOffset().offset);
    }

    @Test
    public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int lingerMs = 50;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withAppendLingerMs(lingerMs).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        Assertions.assertEquals((long)1L, (long)context.log.endOffset().offset);
        int epoch = context.currentEpoch();
        Assertions.assertEquals((long)1L, (long)context.client.scheduleAppend(epoch, Collections.singletonList("a")));
        Assertions.assertTrue((boolean)context.messageQueue.wakeupRequested());
        context.client.poll();
        Assertions.assertFalse((boolean)context.messageQueue.wakeupRequested());
        Assertions.assertEquals((Object)OptionalLong.of(lingerMs), (Object)context.messageQueue.lastPollTimeoutMs());
        context.time.sleep((long)lingerMs);
        Assertions.assertEquals((long)2L, (long)context.client.scheduleAppend(epoch, Collections.singletonList("b")));
        Assertions.assertTrue((boolean)context.messageQueue.wakeupRequested());
        context.client.poll();
        Assertions.assertEquals((long)3L, (long)context.log.endOffset().offset);
    }

    @Test
    public void testHandleEndQuorumRequest() throws Exception {
        int localId = 0;
        int oldLeaderId = 1;
        int leaderEpoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, oldLeaderId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(leaderEpoch, oldLeaderId).build();
        context.deliverRequest((ApiMessage)context.endEpochRequest(leaderEpoch, oldLeaderId, Collections.singletonList(localId)));
        context.pollUntilResponse();
        context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId));
        context.client.poll();
        context.assertVotedCandidate(leaderEpoch + 1, localId);
    }

    @Test
    public void testHandleEndQuorumRequestWithLowerPriorityToBecomeLeader() throws Exception {
        int localId = 0;
        int oldLeaderId = 1;
        int leaderEpoch = 2;
        int preferredNextLeader = 3;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, oldLeaderId, preferredNextLeader});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(leaderEpoch, oldLeaderId).build();
        context.deliverRequest((ApiMessage)context.endEpochRequest(leaderEpoch, oldLeaderId, Arrays.asList(preferredNextLeader, localId)));
        context.pollUntilResponse();
        context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId));
        context.time.sleep(1L);
        context.pollUntilRequest();
        context.assertSentFetchRequest(leaderEpoch, 0L, 0);
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(50L);
        context.pollUntilRequest();
        List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(leaderEpoch + 1, 0, 0L);
        Assertions.assertEquals((int)2, (int)voteRequests.size());
        context.assertVotedCandidate(leaderEpoch + 1, localId);
    }

    @Test
    public void testVoteRequestTimeout() throws Exception {
        int localId = 0;
        int epoch = 1;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.assertUnknownLeader(0);
        context.time.sleep((long)(2 * context.electionTimeoutMs()));
        context.pollUntilRequest();
        context.assertVotedCandidate(epoch, localId);
        int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
        context.time.sleep((long)context.requestTimeoutMs());
        context.client.poll();
        int retryCorrelationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
        context.deliverResponse(correlationId, otherNodeId, (ApiMessage)context.voteResponse(true, Optional.empty(), 1));
        context.client.poll();
        context.assertVotedCandidate(epoch, localId);
        context.deliverResponse(retryCorrelationId, otherNodeId, (ApiMessage)context.voteResponse(true, Optional.empty(), 1));
        context.client.poll();
        context.assertElectedLeader(epoch, localId);
    }

    @Test
    public void testHandleValidVoteRequestAsFollower() throws Exception {
        int localId = 0;
        int epoch = 2;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(epoch).build();
        context.deliverRequest((ApiMessage)context.voteRequest(epoch, otherNodeId, epoch - 1, 1L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), true);
        context.assertVotedCandidate(epoch, otherNodeId);
    }

    @Test
    public void testHandleVoteRequestAsFollowerWithElectedLeader() throws Exception {
        int localId = 0;
        int epoch = 2;
        int otherNodeId = 1;
        int electedLeaderId = 3;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId, electedLeaderId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, electedLeaderId).build();
        context.deliverRequest((ApiMessage)context.voteRequest(epoch, otherNodeId, epoch - 1, 1L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(electedLeaderId), false);
        context.assertElectedLeader(epoch, electedLeaderId);
    }

    @Test
    public void testHandleVoteRequestAsFollowerWithVotedCandidate() throws Exception {
        int localId = 0;
        int epoch = 2;
        int otherNodeId = 1;
        int votedCandidateId = 3;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId, votedCandidateId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withVotedCandidate(epoch, votedCandidateId).build();
        context.deliverRequest((ApiMessage)context.voteRequest(epoch, otherNodeId, epoch - 1, 1L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), false);
        context.assertVotedCandidate(epoch, votedCandidateId);
    }

    @Test
    public void testHandleInvalidVoteRequestWithOlderEpoch() throws Exception {
        int localId = 0;
        int epoch = 2;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(epoch).build();
        context.deliverRequest((ApiMessage)context.voteRequest(epoch - 1, otherNodeId, epoch - 2, 1L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.empty(), false);
        context.assertUnknownLeader(epoch);
    }

    @Test
    public void testHandleInvalidVoteRequestAsObserver() throws Exception {
        int localId = 0;
        int epoch = 2;
        int otherNodeId = 1;
        int otherNodeId2 = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{otherNodeId, otherNodeId2});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(epoch).build();
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, otherNodeId, epoch, 1L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.empty(), false);
        context.assertUnknownLeader(epoch);
    }

    @Test
    public void testLeaderIgnoreVoteRequestOnSameEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int leaderEpoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, leaderEpoch);
        context.deliverRequest((ApiMessage)context.voteRequest(leaderEpoch, otherNodeId, leaderEpoch - 1, 1L));
        context.client.poll();
        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(localId), false);
        context.assertElectedLeader(leaderEpoch, localId);
    }

    @Test
    public void testListenerCommitCallbackAfterLeaderWrite() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.client.highWatermark());
        Assertions.assertEquals((long)1L, (long)context.log.endOffset().offset);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        Assertions.assertEquals((Object)OptionalLong.of(1L), (Object)context.client.highWatermark());
        List<String> records = Arrays.asList("a", "b", "c");
        long offset = context.client.scheduleAppend(epoch, records);
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(0L), (Object)context.listener.lastCommitOffset());
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 1L, epoch, 500));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        Assertions.assertEquals((Object)OptionalLong.of(1L), (Object)context.client.highWatermark());
        Assertions.assertEquals((Object)OptionalLong.of(0L), (Object)context.listener.lastCommitOffset());
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 4L, epoch, 500));
        context.pollUntil(() -> context.client.highWatermark().equals(OptionalLong.of(4L)));
        Assertions.assertEquals(records, context.listener.commitWithLastOffset(offset));
    }

    @Test
    public void testLeaderImmediatelySendsDivergingEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(5).appendToLog(1, Arrays.asList("a", "b", "c")).appendToLog(3, Arrays.asList("d", "e", "f")).appendToLog(5, Arrays.asList("g", "h", "i")).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 6L, 2, 500));
        context.client.poll();
        FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)partitionResponse.currentLeader().leaderId());
        Assertions.assertEquals((int)1, (int)partitionResponse.divergingEpoch().epoch());
        Assertions.assertEquals((long)3L, (long)partitionResponse.divergingEpoch().endOffset());
    }

    @Test
    public void testCandidateIgnoreVoteRequestOnSameEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int leaderEpoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withVotedCandidate(leaderEpoch, localId).build();
        context.pollUntilRequest();
        context.deliverRequest((ApiMessage)context.voteRequest(leaderEpoch, otherNodeId, leaderEpoch - 1, 1L));
        context.client.poll();
        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.empty(), false);
        context.assertVotedCandidate(leaderEpoch, localId);
    }

    @Test
    public void testRetryElection() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 1;
        int exponentialFactor = 85;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(exponentialFactor)).build();
        context.assertUnknownLeader(0);
        context.time.sleep((long)(2 * context.electionTimeoutMs()));
        context.pollUntilRequest();
        context.assertVotedCandidate(epoch, localId);
        int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
        context.deliverResponse(correlationId, otherNodeId, (ApiMessage)context.voteResponse(false, Optional.empty(), 1));
        context.client.poll();
        context.assertVotedCandidate(epoch, localId);
        context.time.sleep((long)(context.electionBackoffMaxMs - 1));
        context.client.poll();
        context.assertVotedCandidate(epoch, localId);
        context.time.sleep(1L);
        context.client.poll();
        context.pollUntilRequest();
        context.assertVotedCandidate(epoch + 1, localId);
        context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
    }

    @Test
    public void testInitializeAsFollowerEmptyLog() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.pollUntilRequest();
        context.assertSentFetchRequest(epoch, 0L, 0);
    }

    @Test
    public void testInitializeAsFollowerNonEmptyLog() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        int lastEpoch = 3;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).appendToLog(lastEpoch, Collections.singletonList("foo")).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.pollUntilRequest();
        context.assertSentFetchRequest(epoch, 1L, lastEpoch);
    }

    @Test
    public void testVoterBecomeCandidateAfterFetchTimeout() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        int lastEpoch = 3;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).appendToLog(lastEpoch, Collections.singletonList("foo")).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.pollUntilRequest();
        context.assertSentFetchRequest(epoch, 1L, lastEpoch);
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(50000L);
        context.pollUntilRequest();
        context.assertSentVoteRequest(epoch + 1, lastEpoch, 1L, 1);
        context.assertVotedCandidate(epoch + 1, localId);
    }

    @Test
    public void testInitializeObserverNoPreviousState() throws Exception {
        int localId = 0;
        int leaderId = 1;
        int otherNodeId = 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{leaderId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        context.client.poll();
        context.assertElectedLeader(epoch, leaderId);
    }

    @Test
    public void testObserverQuorumDiscoveryFailure() throws Exception {
        int localId = 0;
        int leaderId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{leaderId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(-1, -1, (Records)MemoryRecords.EMPTY, -1L, Errors.UNKNOWN_SERVER_ERROR));
        context.client.poll();
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(50L);
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        context.client.poll();
        context.assertElectedLeader(epoch, leaderId);
    }

    @Test
    public void testObserverSendDiscoveryFetchAfterFetchTimeout() throws Exception {
        int localId = 0;
        int leaderId = 1;
        int otherNodeId = 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{leaderId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        context.client.poll();
        context.assertElectedLeader(epoch, leaderId);
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(50000L);
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
    }

    @Test
    public void testInvalidFetchRequest() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, -5L, 0, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 0L, -1, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 0L, epoch + 1, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch + 1, otherNodeId, 0L, 0, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.UNKNOWN_LEADER_EPOCH, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 0L, 0, -1));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.FETCH)
    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.client.highWatermark());
        Assertions.assertEquals((long)1L, (long)context.log.endOffset().offset);
        FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0);
        FetchRequestData request = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
        Assertions.assertEquals((int)(version < 15 ? 1 : -1), (int)fetchRequestData.replicaId());
        Assertions.assertEquals((int)(version < 15 ? -1 : 1), (int)fetchRequestData.replicaState().replicaId());
        context.deliverRequest((ApiMessage)request);
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        Assertions.assertEquals((Object)OptionalLong.of(1L), (Object)context.client.highWatermark());
    }

    @Test
    public void testFetchRequestClusterIdValidation() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, context.clusterId.toString(), otherNodeId, -5L, 0, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, null, otherNodeId, -5L, 0, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, "", otherNodeId, -5L, 0, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, "invalid-uuid", otherNodeId, -5L, 0, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @Test
    public void testVoteRequestClusterIdValidation() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch, localId, 0, 0L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch, localId, 0, 0L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false);
        context.deliverRequest((ApiMessage)context.voteRequest("", epoch, localId, 0, 0L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
        context.deliverRequest((ApiMessage)context.voteRequest("invalid-uuid", epoch, localId, 0, 0L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @Test
    public void testBeginQuorumEpochRequestClusterIdValidation() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.beginEpochRequest(context.clusterId.toString(), epoch, localId));
        context.pollUntilResponse();
        context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.beginEpochRequest(epoch, localId));
        context.pollUntilResponse();
        context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.beginEpochRequest("", epoch, localId));
        context.pollUntilResponse();
        context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
        context.deliverRequest((ApiMessage)context.beginEpochRequest("invalid-uuid", epoch, localId));
        context.pollUntilResponse();
        context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @Test
    public void testEndQuorumEpochRequestClusterIdValidation() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.endEpochRequest(context.clusterId.toString(), epoch, localId, Collections.singletonList(otherNodeId)));
        context.pollUntilResponse();
        context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.endEpochRequest(epoch, localId, Collections.singletonList(otherNodeId)));
        context.pollUntilResponse();
        context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.endEpochRequest("", epoch, localId, Collections.singletonList(otherNodeId)));
        context.pollUntilResponse();
        context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
        context.deliverRequest((ApiMessage)context.endEpochRequest("invalid-uuid", epoch, localId, Collections.singletonList(otherNodeId)));
        context.pollUntilResponse();
        context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @Test
    public void testVoterOnlyRequestValidation() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        int nonVoterId = 2;
        context.deliverRequest((ApiMessage)context.voteRequest(epoch, nonVoterId, 0, 0L));
        context.client.poll();
        context.assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId), false);
        context.deliverRequest((ApiMessage)context.beginEpochRequest(epoch, nonVoterId));
        context.client.poll();
        context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.endEpochRequest(epoch, nonVoterId, Collections.singletonList(otherNodeId)));
        context.client.poll();
        context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId));
    }

    @Test
    public void testInvalidVoteRequest() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, otherNodeId, 0, -5L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(otherNodeId), false);
        context.assertElectedLeader(epoch, otherNodeId);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, otherNodeId, -1, 0L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(otherNodeId), false);
        context.assertElectedLeader(epoch, otherNodeId);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, otherNodeId, epoch + 1, 0L));
        context.pollUntilResponse();
        context.assertSentVoteResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(otherNodeId), false);
        context.assertElectedLeader(epoch, otherNodeId);
    }

    @Test
    public void testPurgatoryFetchTimeout() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        int maxWaitTimeMs = 500;
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 1L, epoch, maxWaitTimeMs));
        context.client.poll();
        Assertions.assertEquals((int)0, (int)context.channel.drainSendQueue().size());
        context.time.sleep((long)maxWaitTimeMs);
        context.client.poll();
        MemoryRecords fetchedRecords = context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        Assertions.assertEquals((int)0, (int)fetchedRecords.sizeInBytes());
    }

    @Test
    public void testPurgatoryFetchSatisfiedByWrite() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 1L, epoch, 500));
        context.client.poll();
        Assertions.assertEquals((int)0, (int)context.channel.drainSendQueue().size());
        String[] appendRecords = new String[]{"a", "b", "c"};
        context.client.scheduleAppend(epoch, Arrays.asList(appendRecords));
        context.client.poll();
        MemoryRecords fetchedRecords = context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        RaftClientTestContext.assertMatchingRecords(appendRecords, (Records)fetchedRecords);
    }

    @Test
    public void testPurgatoryFetchCompletedByFollowerTransition() throws Exception {
        int localId;
        int voter1 = localId = 0;
        int voter2 = localId + 1;
        int voter3 = localId + 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{voter1, voter2, voter3});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, voter2, 1L, epoch, 500));
        context.client.poll();
        Assertions.assertTrue((boolean)context.channel.drainSendQueue().stream().noneMatch(msg -> msg.data() instanceof FetchResponseData));
        context.deliverRequest((ApiMessage)context.beginEpochRequest(epoch + 1, voter3));
        context.pollUntilResponse();
        context.assertElectedLeader(epoch + 1, voter3);
        context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch + 1, OptionalInt.of(voter3));
        MemoryRecords fetchedRecords = context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, epoch + 1, OptionalInt.of(voter3));
        Assertions.assertEquals((int)0, (int)fetchedRecords.sizeInBytes());
    }

    @Test
    public void testFetchResponseIgnoredAfterBecomingCandidate() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.pollUntilRequest();
        int fetchCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0);
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(50000L);
        context.client.poll();
        context.assertVotedCandidate(epoch + 1, localId);
        MemoryRecords records = context.buildBatch(0L, 3, Arrays.asList("a", "b"));
        context.deliverResponse(fetchCorrelationId, otherNodeId, (ApiMessage)context.fetchResponse(epoch, otherNodeId, (Records)records, 0L, Errors.NONE));
        context.client.poll();
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.assertVotedCandidate(epoch + 1, localId);
    }

    @Test
    public void testFetchResponseIgnoredAfterBecomingFollowerOfDifferentLeader() throws Exception {
        int localId;
        int voter1 = localId = 0;
        int voter2 = localId + 1;
        int voter3 = localId + 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{voter1, voter2, voter3});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, voter2).build();
        context.assertElectedLeader(epoch, voter2);
        context.pollUntilRequest();
        int fetchCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0);
        context.deliverRequest((ApiMessage)context.beginEpochRequest(epoch + 1, voter3));
        context.client.poll();
        context.assertElectedLeader(epoch + 1, voter3);
        MemoryRecords records = context.buildBatch(0L, 3, Arrays.asList("a", "b"));
        FetchResponseData response = context.fetchResponse(epoch, voter2, (Records)records, 0L, Errors.NONE);
        context.deliverResponse(fetchCorrelationId, voter2, (ApiMessage)response);
        context.client.poll();
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        context.assertElectedLeader(epoch + 1, voter3);
    }

    @Test
    public void testVoteResponseIgnoredAfterBecomingFollower() throws Exception {
        int localId;
        int voter1 = localId = 0;
        int voter2 = localId + 1;
        int voter3 = localId + 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{voter1, voter2, voter3});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(epoch - 1).build();
        context.assertUnknownLeader(epoch - 1);
        context.time.sleep((long)(context.electionTimeoutMs() * 2));
        context.pollUntilRequest();
        context.assertVotedCandidate(epoch, localId);
        List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(epoch, 0, 0L);
        Assertions.assertEquals((int)2, (int)voteRequests.size());
        context.deliverRequest((ApiMessage)context.beginEpochRequest(epoch, voter3));
        context.client.poll();
        context.assertElectedLeader(epoch, voter3);
        VoteResponseData voteResponse1 = context.voteResponse(false, Optional.empty(), epoch);
        context.deliverResponse(voteRequests.get((int)0).correlationId, voter2, (ApiMessage)voteResponse1);
        VoteResponseData voteResponse2 = context.voteResponse(false, Optional.of(voter3), epoch);
        context.deliverResponse(voteRequests.get((int)1).correlationId, voter3, (ApiMessage)voteResponse2);
        context.client.poll();
        context.assertElectedLeader(epoch, voter3);
    }

    @Test
    public void testObserverLeaderRediscoveryAfterBrokerNotAvailableError() throws Exception {
        int localId = 0;
        int leaderId = 1;
        int otherNodeId = 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{leaderId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.discoverLeaderAsObserver(leaderId, epoch);
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
        Assertions.assertEquals((int)leaderId, (int)fetchRequest1.destinationId());
        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
        context.deliverResponse(fetchRequest1.correlationId, fetchRequest1.destinationId(), (ApiMessage)context.fetchResponse(epoch, -1, (Records)MemoryRecords.EMPTY, -1L, Errors.BROKER_NOT_AVAILABLE));
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
        Assertions.assertNotEquals((int)leaderId, (int)fetchRequest2.destinationId());
        Assertions.assertTrue((boolean)voters.contains(fetchRequest2.destinationId()));
        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
        Errors error = fetchRequest2.destinationId() == leaderId ? Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER;
        context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, 0L, error));
        context.client.poll();
        context.assertElectedLeader(epoch, leaderId);
    }

    @Test
    public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception {
        int localId = 0;
        int leaderId = 1;
        int otherNodeId = 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{leaderId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.discoverLeaderAsObserver(leaderId, epoch);
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
        Assertions.assertEquals((int)leaderId, (int)fetchRequest1.destinationId());
        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
        context.time.sleep((long)context.requestTimeoutMs());
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
        Assertions.assertNotEquals((int)leaderId, (int)fetchRequest2.destinationId());
        Assertions.assertTrue((boolean)voters.contains(fetchRequest2.destinationId()));
        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
        context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        context.client.poll();
        context.assertElectedLeader(epoch, leaderId);
    }

    @Test
    public void testLeaderGracefulShutdown() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        int shutdownTimeoutMs = 5000;
        CompletableFuture shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
        Assertions.assertTrue((boolean)context.client.isShuttingDown());
        Assertions.assertTrue((boolean)context.client.isRunning());
        Assertions.assertFalse((boolean)shutdownFuture.isDone());
        context.pollUntilRequest();
        Assertions.assertTrue((boolean)context.client.isShuttingDown());
        Assertions.assertTrue((boolean)context.client.isRunning());
        context.assertSentEndQuorumEpochRequest(1, otherNodeId);
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, otherNodeId, epoch, 1L));
        context.client.poll();
        context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true);
        context.deliverRequest((ApiMessage)context.beginEpochRequest(2, otherNodeId));
        TestUtils.waitForCondition(() -> {
            context.client.poll();
            return !context.client.isRunning();
        }, (long)5000L, (String)"Client failed to shutdown before expiration of timeout");
        Assertions.assertFalse((boolean)context.client.isShuttingDown());
        Assertions.assertTrue((boolean)shutdownFuture.isDone());
        Assertions.assertNull(shutdownFuture.get());
    }

    @Test
    public void testEndQuorumEpochSentBasedOnFetchOffset() throws Exception {
        int localId = 0;
        int closeFollower = 2;
        int laggingFollower = 1;
        int epoch = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, closeFollower, laggingFollower});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)context.fetchRequest(1, laggingFollower, 1L, epoch, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(1L, epoch);
        context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
        context.client.poll();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, closeFollower, 3L, epoch, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(3L, epoch);
        context.client.shutdown(context.electionTimeoutMs() * 2);
        Assertions.assertTrue((boolean)context.client.isRunning());
        context.pollUntilRequest();
        Assertions.assertTrue((boolean)context.client.isRunning());
        context.collectEndQuorumRequests(epoch, Utils.mkSet((Object[])new Integer[]{closeFollower, laggingFollower}), Optional.of(Arrays.asList(closeFollower, laggingFollower)));
    }

    @Test
    public void testDescribeQuorumNonLeader() throws Exception {
        int localId = 0;
        int voter2 = localId + 1;
        int voter3 = localId + 2;
        int epoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, voter2, voter3});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(epoch).build();
        context.deliverRequest((ApiMessage)DescribeQuorumRequest.singletonRequest((TopicPartition)context.metadataPartition));
        context.pollUntilResponse();
        DescribeQuorumResponseData responseData = context.collectDescribeQuorumResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)responseData.errorCode()));
        Assertions.assertEquals((int)1, (int)responseData.topics().size());
        DescribeQuorumResponseData.TopicData topicData = (DescribeQuorumResponseData.TopicData)responseData.topics().get(0);
        Assertions.assertEquals((Object)context.metadataPartition.topic(), (Object)topicData.topicName());
        Assertions.assertEquals((int)1, (int)topicData.partitions().size());
        DescribeQuorumResponseData.PartitionData partitionData = (DescribeQuorumResponseData.PartitionData)topicData.partitions().get(0);
        Assertions.assertEquals((int)context.metadataPartition.partition(), (int)partitionData.partitionIndex());
        Assertions.assertEquals((Object)Errors.NOT_LEADER_OR_FOLLOWER, (Object)Errors.forCode((short)partitionData.errorCode()));
    }

    @Test
    public void testDescribeQuorum() throws Exception {
        int localId = 0;
        int closeFollower = 2;
        int laggingFollower = 1;
        int epoch = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, closeFollower, laggingFollower});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        long laggingFollowerFetchTime = context.time.milliseconds();
        context.deliverRequest((ApiMessage)context.fetchRequest(1, laggingFollower, 1L, epoch, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(1L, epoch);
        context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
        context.client.poll();
        context.time.sleep(100L);
        long closeFollowerFetchTime = context.time.milliseconds();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, closeFollower, 3L, epoch, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(3L, epoch);
        int observerId = 3;
        context.time.sleep(100L);
        long observerFetchTime = context.time.milliseconds();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, observerId, 0L, 0, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(3L, epoch);
        context.time.sleep(100L);
        context.deliverRequest((ApiMessage)DescribeQuorumRequest.singletonRequest((TopicPartition)context.metadataPartition));
        context.pollUntilResponse();
        context.assertSentDescribeQuorumResponse(localId, epoch, 3L, Arrays.asList(new DescribeQuorumResponseData.ReplicaState().setReplicaId(localId).setLogEndOffset(3L).setLastFetchTimestamp(context.time.milliseconds()).setLastCaughtUpTimestamp(context.time.milliseconds()), new DescribeQuorumResponseData.ReplicaState().setReplicaId(laggingFollower).setLogEndOffset(1L).setLastFetchTimestamp(laggingFollowerFetchTime).setLastCaughtUpTimestamp(laggingFollowerFetchTime), new DescribeQuorumResponseData.ReplicaState().setReplicaId(closeFollower).setLogEndOffset(3L).setLastFetchTimestamp(closeFollowerFetchTime).setLastCaughtUpTimestamp(closeFollowerFetchTime)), Collections.singletonList(new DescribeQuorumResponseData.ReplicaState().setReplicaId(observerId).setLogEndOffset(0L).setLastFetchTimestamp(observerFetchTime).setLastCaughtUpTimestamp(-1L)));
    }

    @Test
    public void testLeaderGracefulShutdownTimeout() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        int shutdownTimeoutMs = 5000;
        CompletableFuture shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
        Assertions.assertTrue((boolean)context.client.isRunning());
        Assertions.assertFalse((boolean)shutdownFuture.isDone());
        context.pollUntilRequest();
        Assertions.assertTrue((boolean)context.client.isRunning());
        context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
        context.time.sleep((long)shutdownTimeoutMs);
        context.client.poll();
        Assertions.assertFalse((boolean)context.client.isRunning());
        Assertions.assertTrue((boolean)shutdownFuture.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)shutdownFuture, TimeoutException.class);
    }

    @Test
    public void testFollowerGracefulShutdown() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.client.poll();
        int shutdownTimeoutMs = 5000;
        CompletableFuture shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
        Assertions.assertTrue((boolean)context.client.isRunning());
        Assertions.assertFalse((boolean)shutdownFuture.isDone());
        context.client.poll();
        Assertions.assertFalse((boolean)context.client.isRunning());
        Assertions.assertTrue((boolean)shutdownFuture.isDone());
        Assertions.assertNull(shutdownFuture.get());
    }

    @Test
    public void testObserverGracefulShutdown() throws Exception {
        int localId = 0;
        int voter1 = 1;
        int voter2 = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{voter1, voter2});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(5).build();
        context.client.poll();
        context.assertUnknownLeader(5);
        CompletableFuture shutdownFuture = context.client.shutdown(5000);
        Assertions.assertTrue((boolean)context.client.isRunning());
        Assertions.assertFalse((boolean)shutdownFuture.isDone());
        context.client.poll();
        Assertions.assertFalse((boolean)context.client.isRunning());
        Assertions.assertTrue((boolean)shutdownFuture.isDone());
        Assertions.assertNull(shutdownFuture.get());
    }

    @Test
    public void testGracefulShutdownSingleMemberQuorum() throws IOException {
        int localId = 0;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build();
        context.assertElectedLeader(1, localId);
        context.client.poll();
        Assertions.assertEquals((int)0, (int)context.channel.drainSendQueue().size());
        int shutdownTimeoutMs = 5000;
        context.client.shutdown(shutdownTimeoutMs);
        Assertions.assertTrue((boolean)context.client.isRunning());
        context.client.poll();
        Assertions.assertFalse((boolean)context.client.isRunning());
    }

    @Test
    public void testFollowerReplication() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.pollUntilRequest();
        int fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0);
        MemoryRecords records = context.buildBatch(0L, 3, Arrays.asList("a", "b"));
        FetchResponseData response = context.fetchResponse(epoch, otherNodeId, (Records)records, 0L, Errors.NONE);
        context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, (ApiMessage)response);
        context.client.poll();
        Assertions.assertEquals((long)2L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((long)2L, (long)context.log.firstUnflushedOffset());
    }

    @Test
    public void testEmptyRecordSetInFetchResponse() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.pollUntilRequest();
        int fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0);
        FetchResponseData fetchResponse = context.fetchResponse(epoch, otherNodeId, (Records)MemoryRecords.EMPTY, 0L, Errors.NONE);
        context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, (ApiMessage)fetchResponse);
        context.client.poll();
        Assertions.assertEquals((long)0L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((Object)OptionalLong.of(0L), (Object)context.client.highWatermark());
        context.pollUntilRequest();
        MemoryRecords records = context.buildBatch(0L, epoch, Arrays.asList("a", "b"));
        fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0);
        fetchResponse = context.fetchResponse(epoch, otherNodeId, (Records)records, 0L, Errors.NONE);
        context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, (ApiMessage)fetchResponse);
        context.client.poll();
        Assertions.assertEquals((long)2L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((Object)OptionalLong.of(0L), (Object)context.client.highWatermark());
        context.pollUntilRequest();
        fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 2L, epoch);
        fetchResponse = context.fetchResponse(epoch, otherNodeId, (Records)MemoryRecords.EMPTY, 2L, Errors.NONE);
        context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, (ApiMessage)fetchResponse);
        context.client.poll();
        Assertions.assertEquals((long)2L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((Object)OptionalLong.of(2L), (Object)context.client.highWatermark());
    }

    @Test
    public void testFetchShouldBeTreatedAsLeaderAcknowledgement() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(10000, 0)).withUnknownLeader(epoch - 1).build();
        context.time.sleep((long)context.electionTimeoutMs());
        context.expectAndGrantVotes(epoch);
        context.pollUntilRequest();
        context.assertSentBeginQuorumEpochRequest(epoch, 1);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 0L, 0, 500));
        context.client.poll();
        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        context.time.sleep((long)context.requestTimeoutMs());
        context.client.poll();
        List<RaftRequest.Outbound> sentMessages = context.channel.drainSendQueue();
        Assertions.assertEquals((int)0, (int)sentMessages.size());
    }

    @Test
    public void testLeaderAppendSingleMemberQuorum() throws Exception {
        int localId = 0;
        Set<Integer> voters = Collections.singleton(localId);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
        long now = context.time.milliseconds();
        context.pollUntil(() -> context.log.endOffset().offset == 1L);
        context.assertElectedLeader(1, localId);
        Assertions.assertEquals((Object)OptionalLong.of(1L), (Object)context.client.highWatermark());
        String[] appendRecords = new String[]{"a", "b", "c"};
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(1L), (Object)context.client.highWatermark());
        context.client.scheduleAppend(context.currentEpoch(), Arrays.asList(appendRecords));
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(4L), (Object)context.client.highWatermark());
        int otherNodeId = 1;
        ArrayList batches = new ArrayList(2);
        boolean appended = true;
        while (appended) {
            long fetchOffset = 0L;
            int lastFetchedEpoch = 0;
            if (!batches.isEmpty()) {
                MutableRecordBatch lastBatch = (MutableRecordBatch)batches.get(batches.size() - 1);
                fetchOffset = lastBatch.lastOffset() + 1L;
                lastFetchedEpoch = lastBatch.partitionLeaderEpoch();
            }
            context.deliverRequest((ApiMessage)context.fetchRequest(1, otherNodeId, fetchOffset, lastFetchedEpoch, 0));
            context.pollUntilResponse();
            MemoryRecords fetchedRecords = context.assertSentFetchPartitionResponse(Errors.NONE, 1, OptionalInt.of(localId));
            List fetchedBatch = Utils.toList((Iterator)fetchedRecords.batchIterator());
            batches.addAll(fetchedBatch);
            appended = !fetchedBatch.isEmpty();
        }
        Assertions.assertEquals((int)2, (int)batches.size());
        MutableRecordBatch leaderChangeBatch = (MutableRecordBatch)batches.get(0);
        Assertions.assertTrue((boolean)leaderChangeBatch.isControlBatch());
        List readRecords = Utils.toList((Iterator)leaderChangeBatch.iterator());
        Assertions.assertEquals((int)1, (int)readRecords.size());
        Record record = (Record)readRecords.get(0);
        Assertions.assertEquals((long)now, (long)record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(localId, Collections.singletonList(localId), Collections.singletonList(localId), record.key(), record.value());
        MutableRecordBatch batch = (MutableRecordBatch)batches.get(1);
        Assertions.assertEquals((int)1, (int)batch.partitionLeaderEpoch());
        readRecords = Utils.toList((Iterator)batch.iterator());
        Assertions.assertEquals((int)3, (int)readRecords.size());
        for (int i = 0; i < appendRecords.length; ++i) {
            Assertions.assertEquals((Object)appendRecords[i], (Object)Utils.utf8((ByteBuffer)((Record)readRecords.get(i)).value()));
        }
    }

    @Test
    public void testFollowerLogReconciliation() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        int lastEpoch = 3;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).appendToLog(lastEpoch, Arrays.asList("foo", "bar")).appendToLog(lastEpoch, Arrays.asList("baz")).build();
        context.assertElectedLeader(epoch, otherNodeId);
        Assertions.assertEquals((long)3L, (long)context.log.endOffset().offset);
        context.pollUntilRequest();
        int correlationId = context.assertSentFetchRequest(epoch, 3L, lastEpoch);
        FetchResponseData response = context.divergingFetchResponse(epoch, otherNodeId, 2L, lastEpoch, 1L);
        context.deliverResponse(correlationId, otherNodeId, (ApiMessage)response);
        context.client.poll();
        Assertions.assertEquals((long)2L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((long)2L, (long)context.log.firstUnflushedOffset());
        context.client.poll();
        context.assertSentFetchRequest(epoch, 2L, lastEpoch);
    }

    @Test
    public void testMetrics() throws Exception {
        int localId = 0;
        int epoch = 1;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build();
        context.pollUntil(() -> context.log.endOffset().offset == 1L);
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "current-state"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "current-leader"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "current-vote"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "current-epoch"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "high-watermark"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "log-end-offset"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "log-end-epoch"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "number-unknown-voter-connections"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "poll-idle-ratio-avg"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "commit-latency-avg"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "commit-latency-max"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "election-latency-avg"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "election-latency-max"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "fetch-records-rate"));
        Assertions.assertNotNull((Object)KafkaRaftClientTest.getMetric(context.metrics, "append-records-rate"));
        Assertions.assertEquals((Object)"leader", (Object)KafkaRaftClientTest.getMetric(context.metrics, "current-state").metricValue());
        Assertions.assertEquals((Object)localId, (Object)KafkaRaftClientTest.getMetric(context.metrics, "current-leader").metricValue());
        Assertions.assertEquals((Object)localId, (Object)KafkaRaftClientTest.getMetric(context.metrics, "current-vote").metricValue());
        Assertions.assertEquals((Object)epoch, (Object)KafkaRaftClientTest.getMetric(context.metrics, "current-epoch").metricValue());
        Assertions.assertEquals((Object)1.0, (Object)KafkaRaftClientTest.getMetric(context.metrics, "high-watermark").metricValue());
        Assertions.assertEquals((Object)1.0, (Object)KafkaRaftClientTest.getMetric(context.metrics, "log-end-offset").metricValue());
        Assertions.assertEquals((Object)epoch, (Object)KafkaRaftClientTest.getMetric(context.metrics, "log-end-epoch").metricValue());
        context.client.scheduleAppend(epoch, Arrays.asList("a", "b", "c"));
        context.client.poll();
        Assertions.assertEquals((Object)4.0, (Object)KafkaRaftClientTest.getMetric(context.metrics, "high-watermark").metricValue());
        Assertions.assertEquals((Object)4.0, (Object)KafkaRaftClientTest.getMetric(context.metrics, "log-end-offset").metricValue());
        Assertions.assertEquals((Object)epoch, (Object)KafkaRaftClientTest.getMetric(context.metrics, "log-end-epoch").metricValue());
        context.client.close();
        Assertions.assertEquals((int)1, (int)context.metrics.metrics().size());
    }

    @Test
    public void testClusterAuthorizationFailedInFetch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).build();
        context.assertElectedLeader(epoch, otherNodeId);
        context.pollUntilRequest();
        int correlationId = context.assertSentFetchRequest(epoch, 0L, 0);
        FetchResponseData response = new FetchResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
        context.deliverResponse(correlationId, otherNodeId, (ApiMessage)response);
        Assertions.assertThrows(ClusterAuthorizationException.class, () -> context.client.poll());
    }

    @Test
    public void testClusterAuthorizationFailedInBeginQuorumEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).updateRandom(r -> r.mockNextInt(10000, 0)).withUnknownLeader(epoch - 1).build();
        context.time.sleep((long)context.electionTimeoutMs());
        context.expectAndGrantVotes(epoch);
        context.pollUntilRequest();
        int correlationId = context.assertSentBeginQuorumEpochRequest(epoch, 1);
        BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
        context.deliverResponse(correlationId, otherNodeId, (ApiMessage)response);
        Assertions.assertThrows(ClusterAuthorizationException.class, () -> context.client.poll());
    }

    @Test
    public void testClusterAuthorizationFailedInVote() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(epoch - 1).build();
        context.time.sleep((long)(context.electionTimeoutMs() * 2));
        context.pollUntilRequest();
        context.assertVotedCandidate(epoch, localId);
        int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
        VoteResponseData response = new VoteResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
        context.deliverResponse(correlationId, otherNodeId, (ApiMessage)response);
        Assertions.assertThrows(ClusterAuthorizationException.class, () -> context.client.poll());
    }

    @Test
    public void testClusterAuthorizationFailedInEndQuorumEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.client.shutdown(5000);
        context.pollUntilRequest();
        int correlationId = context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
        EndQuorumEpochResponseData response = new EndQuorumEpochResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
        context.deliverResponse(correlationId, otherNodeId, (ApiMessage)response);
        Assertions.assertThrows(ClusterAuthorizationException.class, () -> context.client.poll());
    }

    @Test
    public void testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffsetOnEmptyLog() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        context.client.poll();
        int epoch = context.currentEpoch();
        Assertions.assertEquals((long)1L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)context.listener.currentClaimedEpoch());
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.listener.lastCommitOffset());
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 0L, 0, 0));
        context.client.poll();
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)context.listener.currentClaimedEpoch());
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.listener.lastCommitOffset());
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0));
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(0L), (Object)context.listener.lastCommitOffset());
        context.client.poll();
        Assertions.assertEquals((Object)OptionalInt.of(epoch), (Object)context.listener.currentClaimedEpoch());
        Assertions.assertEquals((long)0L, (Long)context.listener.claimedEpochStartOffset(epoch));
    }

    @Test
    public void testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffset() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        List<String> batch1 = Arrays.asList("1", "2", "3");
        List<String> batch2 = Arrays.asList("4", "5", "6");
        List<String> batch3 = Arrays.asList("7", "8", "9");
        List<List> expectedBatches = Arrays.asList(batch1, batch2, batch3);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(1, batch1).appendToLog(1, batch2).appendToLog(2, batch3).withUnknownLeader(epoch - 1).build();
        context.becomeLeader();
        context.client.poll();
        Assertions.assertEquals((long)10L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)context.listener.currentClaimedEpoch());
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.listener.lastCommitOffset());
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 3L, 1, 500));
        context.client.poll();
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)context.listener.currentClaimedEpoch());
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.listener.lastCommitOffset());
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 10L, epoch, 500));
        context.pollUntil(() -> {
            int index = 0;
            for (Batch<String> batch : context.listener.committedBatches()) {
                if (index < expectedBatches.size()) {
                    Assertions.assertEquals(expectedBatches.get(index), (Object)batch.records());
                }
                ++index;
            }
            Assertions.assertEquals((int)4, (int)index);
            return context.listener.currentClaimedEpoch().isPresent();
        });
        Assertions.assertEquals((Object)OptionalInt.of(epoch), (Object)context.listener.currentClaimedEpoch());
        Assertions.assertEquals((Object)OptionalLong.of(9L), (Object)context.listener.lastCommitOffset());
        Assertions.assertEquals((long)9L, (Long)context.listener.claimedEpochStartOffset(epoch));
    }

    @Test
    public void testLateRegisteredListenerCatchesUp() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        List<String> batch1 = Arrays.asList("1", "2", "3");
        List<String> batch2 = Arrays.asList("4", "5", "6");
        List<String> batch3 = Arrays.asList("7", "8", "9");
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(1, batch1).appendToLog(1, batch2).appendToLog(2, batch3).withUnknownLeader(epoch - 1).build();
        context.becomeLeader();
        context.client.poll();
        Assertions.assertEquals((long)10L, (long)context.log.endOffset().offset);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0));
        context.pollUntil(() -> OptionalInt.of(epoch).equals(context.listener.currentClaimedEpoch()));
        Assertions.assertEquals((Object)OptionalLong.of(10L), (Object)context.client.highWatermark());
        Assertions.assertEquals((Object)OptionalLong.of(9L), (Object)context.listener.lastCommitOffset());
        Assertions.assertEquals((Object)OptionalInt.of(epoch), (Object)context.listener.currentClaimedEpoch());
        Assertions.assertEquals((long)9L, (Long)context.listener.claimedEpochStartOffset(epoch));
        RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
        context.client.register((RaftClient.Listener)secondListener);
        context.pollUntil(() -> OptionalInt.of(epoch).equals(secondListener.currentClaimedEpoch()));
        Assertions.assertEquals((Object)OptionalLong.of(9L), (Object)secondListener.lastCommitOffset());
        Assertions.assertEquals((Object)OptionalInt.of(epoch), (Object)context.listener.currentClaimedEpoch());
        Assertions.assertEquals((long)9L, (Long)secondListener.claimedEpochStartOffset(epoch));
    }

    @Test
    public void testReregistrationChangesListenerContext() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        List<String> batch1 = Arrays.asList("1", "2", "3");
        List<String> batch2 = Arrays.asList("4", "5", "6");
        List<String> batch3 = Arrays.asList("7", "8", "9");
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(1, batch1).appendToLog(1, batch2).appendToLog(2, batch3).withUnknownLeader(epoch - 1).build();
        context.becomeLeader();
        context.client.poll();
        Assertions.assertEquals((long)10L, (long)context.log.endOffset().offset);
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        context.pollUntil(() -> OptionalLong.of(9L).equals(context.listener.lastCommitOffset()));
        RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
        context.client.register((RaftClient.Listener)secondListener);
        context.pollUntil(() -> OptionalLong.of(9L).equals(secondListener.lastCommitOffset()));
        context.client.unregister((RaftClient.Listener)secondListener);
        Assertions.assertEquals((long)10L, (long)context.client.scheduleAppend(epoch, Collections.singletonList("a")));
        context.client.poll();
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        context.pollUntil(() -> OptionalLong.of(10L).equals(context.listener.lastCommitOffset()));
        Assertions.assertEquals((Object)OptionalLong.of(9L), (Object)secondListener.lastCommitOffset());
    }

    @Test
    public void testHandleCommitCallbackFiresAfterFollowerHighWatermarkAdvances() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, otherNodeId).build();
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.client.highWatermark());
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        List<String> records1 = Arrays.asList("a", "b", "c");
        MemoryRecords batch1 = context.buildBatch(0L, 3, records1);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, otherNodeId, (Records)batch1, 0L, Errors.NONE));
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(0L), (Object)context.client.highWatermark());
        Assertions.assertEquals((int)0, (int)context.listener.numCommittedBatches());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)context.listener.currentClaimedEpoch());
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
        List<String> records2 = Arrays.asList("d", "e", "f");
        MemoryRecords batch2 = context.buildBatch(3L, 3, records2);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, otherNodeId, (Records)batch2, 3L, Errors.NONE));
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(3L), (Object)context.client.highWatermark());
        Assertions.assertEquals((int)1, (int)context.listener.numCommittedBatches());
        Assertions.assertEquals((Object)OptionalLong.of(2L), (Object)context.listener.lastCommitOffset());
        Assertions.assertEquals(records1, (Object)context.listener.lastCommit().records());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)context.listener.currentClaimedEpoch());
    }

    @Test
    public void testHandleCommitCallbackFiresInVotedState() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 7;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(2, Arrays.asList("a", "b", "c")).appendToLog(4, Arrays.asList("d", "e", "f")).appendToLog(4, Arrays.asList("g", "h", "i")).withUnknownLeader(epoch - 1).build();
        context.becomeLeader();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 10L, epoch, 500));
        context.client.poll();
        Assertions.assertEquals((Object)OptionalLong.of(10L), (Object)context.client.highWatermark());
        int candidateEpoch = epoch + 1;
        context.deliverRequest((ApiMessage)context.voteRequest(candidateEpoch, otherNodeId, epoch, 10L));
        context.pollUntilResponse();
        context.assertVotedCandidate(candidateEpoch, otherNodeId);
        Assertions.assertEquals((Object)OptionalLong.of(10L), (Object)context.client.highWatermark());
        RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
        context.client.register((RaftClient.Listener)secondListener);
        context.client.poll();
        context.assertVotedCandidate(candidateEpoch, otherNodeId);
        context.pollUntil(() -> secondListener.lastCommitOffset().equals(OptionalLong.of(9L)));
        Assertions.assertEquals((Object)OptionalLong.of(9L), (Object)secondListener.lastCommitOffset());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)secondListener.currentClaimedEpoch());
    }

    @Test
    public void testHandleCommitCallbackFiresInCandidateState() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 7;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(2, Arrays.asList("a", "b", "c")).appendToLog(4, Arrays.asList("d", "e", "f")).appendToLog(4, Arrays.asList("g", "h", "i")).withUnknownLeader(epoch - 1).build();
        context.becomeLeader();
        Assertions.assertEquals((long)10L, (long)context.log.endOffset().offset);
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0));
        context.pollUntilResponse();
        Assertions.assertEquals((Object)OptionalLong.of(10L), (Object)context.client.highWatermark());
        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        context.deliverRequest((ApiMessage)context.voteRequest(epoch + 1, otherNodeId, epoch, 9L));
        context.pollUntilResponse();
        context.assertUnknownLeader(epoch + 1);
        Assertions.assertEquals((Object)OptionalLong.of(10L), (Object)context.client.highWatermark());
        int candidateEpoch = epoch + 2;
        context.time.sleep((long)(context.electionTimeoutMs() * 2));
        context.client.poll();
        context.assertVotedCandidate(candidateEpoch, localId);
        RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
        context.client.register((RaftClient.Listener)secondListener);
        context.client.poll();
        context.assertVotedCandidate(candidateEpoch, localId);
        context.pollUntil(() -> secondListener.lastCommitOffset().equals(OptionalLong.of(9L)));
        Assertions.assertEquals((Object)OptionalLong.of(9L), (Object)secondListener.lastCommitOffset());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)secondListener.currentClaimedEpoch());
    }

    @Test
    public void testObserverFetchWithNoLocalId() throws Exception {
        Set voters = Utils.mkSet((Object[])new Integer[]{1, 2});
        RaftClientTestContext context = new RaftClientTestContext.Builder(OptionalInt.empty(), (Set<Integer>)voters).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest1.destinationId()));
        context.assertFetchRequestData(fetchRequest1, 0, 0L, 0);
        int leaderEpoch = 5;
        int leaderId = 1;
        context.deliverResponse(fetchRequest1.correlationId, fetchRequest1.destinationId(), (ApiMessage)context.fetchResponse(5, leaderId, (Records)MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        context.client.poll();
        context.assertElectedLeader(leaderEpoch, leaderId);
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
        Assertions.assertEquals((int)leaderId, (int)fetchRequest2.destinationId());
        context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0);
        List<String> records = Arrays.asList("a", "b", "c");
        MemoryRecords batch1 = context.buildBatch(0L, 3, records);
        context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(), (ApiMessage)context.fetchResponse(leaderEpoch, leaderId, (Records)batch1, 0L, Errors.NONE));
        context.client.poll();
        Assertions.assertEquals((long)3L, (long)context.log.endOffset().offset);
        Assertions.assertEquals((int)3, (int)context.log.lastFetchedEpoch());
    }

    private static KafkaMetric getMetric(Metrics metrics, String name) {
        return (KafkaMetric)metrics.metrics().get(metrics.metricName(name, "raft-metrics"));
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testAppendWithRequiredBaseOffset(boolean correctOffset) throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).build();
        context.becomeLeader();
        Assertions.assertEquals((Object)OptionalInt.of(localId), (Object)context.currentLeader());
        int epoch = context.currentEpoch();
        if (correctOffset) {
            Assertions.assertEquals((long)1L, (long)context.client.scheduleAtomicAppend(epoch, OptionalLong.of(1L), Collections.singletonList("a")));
            context.deliverRequest((ApiMessage)context.beginEpochRequest(epoch + 1, otherNodeId));
            context.pollUntilResponse();
        } else {
            Assertions.assertThrows(UnexpectedBaseOffsetException.class, () -> context.client.scheduleAtomicAppend(epoch, OptionalLong.of(2L), Collections.singletonList("a")));
        }
    }
}

