/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftUtil;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public final class KafkaRaftClientSnapshotTest {
    @Test
    public void testLatestSnapshotId() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(3L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")).withEmptySnapshot(snapshotId).withElectedLeader(epoch, leaderId).build();
        Assertions.assertEquals(Optional.of(snapshotId), (Object)context.client.latestSnapshotId());
    }

    @Test
    public void testLatestSnapshotIdMissing() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(3L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")).withElectedLeader(epoch, leaderId).build();
        Assertions.assertEquals(Optional.empty(), (Object)context.client.latestSnapshotId());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLeaderListenerNotified(boolean entireLog) throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(3L, 1);
        RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")).withEmptySnapshot(snapshotId);
        if (!entireLog) {
            contextBuilder.deleteBeforeSnapshot(snapshotId);
        }
        RaftClientTestContext context = contextBuilder.build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        long localLogEndOffset = context.log.endOffset().offset;
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        Assertions.assertEquals((long)localLogEndOffset, (long)context.client.highWatermark().getAsLong());
        try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(new List[0]), snapshot);
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFollowerListenerNotified(boolean entireLog) throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(3L, 1);
        RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")).withEmptySnapshot(snapshotId).withElectedLeader(epoch, leaderId);
        if (!entireLog) {
            contextBuilder.deleteBeforeSnapshot(snapshotId);
        }
        RaftClientTestContext context = contextBuilder.build();
        long localLogEndOffset = context.log.endOffset().offset;
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch());
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, localLogEndOffset, Errors.NONE));
        context.pollUntilRequest();
        context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch());
        try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(new List[0]), snapshot);
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testSecondListenerNotified(boolean entireLog) throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(3L, 1);
        RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")).withEmptySnapshot(snapshotId).withElectedLeader(epoch, leaderId);
        if (!entireLog) {
            contextBuilder.deleteBeforeSnapshot(snapshotId);
        }
        RaftClientTestContext context = contextBuilder.build();
        long localLogEndOffset = context.log.endOffset().offset;
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch());
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)MemoryRecords.EMPTY, localLogEndOffset, Errors.NONE));
        context.pollUntilRequest();
        context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch());
        RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
        context.client.register((RaftClient.Listener)secondListener);
        context.client.poll();
        try (SnapshotReader<String> snapshot = secondListener.drainHandledSnapshot().get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(new List[0]), snapshot);
        }
    }

    @Test
    public void testListenerRenotified() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(3L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")).appendToLog(snapshotId.epoch(), Arrays.asList("g", "h", "i")).withEmptySnapshot(snapshotId).deleteBeforeSnapshot(snapshotId).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        context.listener.updateReadCommit(false);
        long localLogEndOffset = context.log.endOffset().offset;
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
        Assertions.assertEquals((long)localLogEndOffset, (long)context.client.highWatermark().getAsLong());
        try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(new List[0]), snapshot);
        }
        OffsetAndEpoch secondSnapshotId = new OffsetAndEpoch(localLogEndOffset, epoch);
        try (SnapshotReader<String> snapshot = (SnapshotReader<String>)context.client.createSnapshot(secondSnapshotId, 0L).get();){
            Assertions.assertEquals((Object)secondSnapshotId, (Object)snapshot.snapshotId());
            snapshot.freeze();
        }
        context.log.deleteBeforeSnapshot(secondSnapshotId);
        context.client.poll();
        context.listener.updateReadCommit(true);
        context.client.poll();
        snapshot = context.listener.drainHandledSnapshot().get();
        try {
            Assertions.assertEquals((Object)secondSnapshotId, (Object)snapshot.snapshotId());
            SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(new List[0]), snapshot);
        }
        finally {
            if (snapshot != null) {
                snapshot.close();
            }
        }
    }

    @Test
    public void testLeaderImmediatelySendsSnapshotId() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(3L, 4);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withUnknownLeader(snapshotId.epoch()).appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")).appendToLog(snapshotId.epoch(), Arrays.asList("g", "h", "i")).withEmptySnapshot(snapshotId).deleteBeforeSnapshot(snapshotId).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 6L, 2, 500));
        context.client.poll();
        FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)partitionResponse.currentLeader().leaderId());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)partitionResponse.snapshotId().epoch());
        Assertions.assertEquals((long)snapshotId.offset(), (long)partitionResponse.snapshotId().endOffset());
    }

    @Test
    public void testFetchRequestOffsetLessThanLogStart() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withAppendLingerMs(1).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        List<String> appendRecords = Arrays.asList("a", "b", "c");
        context.client.scheduleAppend(epoch, appendRecords);
        context.time.sleep((long)context.appendLingerMs());
        context.client.poll();
        long localLogEndOffset = context.log.endOffset().offset;
        Assertions.assertTrue(((long)appendRecords.size() <= localLogEndOffset ? 1 : 0) != 0, (String)String.format("Record length = %s, log end offset = %s", appendRecords.size(), localLogEndOffset));
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch);
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(snapshotId, 0L).get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            snapshot.freeze();
        }
        context.log.deleteBeforeSnapshot(snapshotId);
        context.client.poll();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, snapshotId.offset() - 2L, snapshotId.epoch(), 0));
        context.pollUntilResponse();
        FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)partitionResponse.currentLeader().leaderId());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)partitionResponse.snapshotId().epoch());
        Assertions.assertEquals((long)snapshotId.offset(), (long)partitionResponse.snapshotId().endOffset());
    }

    @Test
    public void testFetchRequestOffsetAtZero() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withAppendLingerMs(1).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        List<String> appendRecords = Arrays.asList("a", "b", "c");
        context.client.scheduleAppend(epoch, appendRecords);
        context.time.sleep((long)context.appendLingerMs());
        context.client.poll();
        long localLogEndOffset = context.log.endOffset().offset;
        Assertions.assertTrue(((long)appendRecords.size() <= localLogEndOffset ? 1 : 0) != 0, (String)String.format("Record length = %s, log end offset = %s", appendRecords.size(), localLogEndOffset));
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch);
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(snapshotId, 0L).get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            snapshot.freeze();
        }
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, 0L, 0, 0));
        context.pollUntilResponse();
        FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)partitionResponse.currentLeader().leaderId());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)partitionResponse.snapshotId().epoch());
        Assertions.assertEquals((long)snapshotId.offset(), (long)partitionResponse.snapshotId().endOffset());
    }

    @Test
    public void testFetchRequestWithLargerLastFetchedEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3L, 2);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("d", "e", "f")).withAppendLingerMs(1).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        Assertions.assertEquals((int)(oldestSnapshotId.epoch() + 1), (int)epoch);
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(oldestSnapshotId, 0L).get();){
            Assertions.assertEquals((Object)oldestSnapshotId, (Object)snapshot.snapshotId());
            snapshot.freeze();
        }
        context.client.poll();
        context.client.scheduleAppend(epoch, Arrays.asList("g", "h", "i"));
        context.time.sleep((long)context.appendLingerMs());
        context.client.poll();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset() + 1L, epoch + 1, 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
    }

    @Test
    public void testFetchRequestTruncateToLogStart() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        int syncNodeId = otherNodeId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId, syncNodeId});
        OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3L, 2);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(oldestSnapshotId.epoch() + 2, Arrays.asList("d", "e", "f")).withAppendLingerMs(1).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        Assertions.assertEquals((int)(oldestSnapshotId.epoch() + 2 + 1), (int)epoch);
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(oldestSnapshotId, 0L).get();){
            Assertions.assertEquals((Object)oldestSnapshotId, (Object)snapshot.snapshotId());
            snapshot.freeze();
        }
        context.client.poll();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset() + 1L, oldestSnapshotId.epoch() + 1, 0));
        context.pollUntilResponse();
        FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)partitionResponse.currentLeader().leaderId());
        Assertions.assertEquals((int)oldestSnapshotId.epoch(), (int)partitionResponse.divergingEpoch().epoch());
        Assertions.assertEquals((long)oldestSnapshotId.offset(), (long)partitionResponse.divergingEpoch().endOffset());
    }

    @Test
    public void testFetchRequestAtLogStartOffsetWithValidEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        int syncNodeId = otherNodeId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId, syncNodeId});
        OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3L, 2);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("d", "e", "f")).appendToLog(oldestSnapshotId.epoch() + 2, Arrays.asList("g", "h", "i")).withAppendLingerMs(1).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        Assertions.assertEquals((int)(oldestSnapshotId.epoch() + 2 + 1), (int)epoch);
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(oldestSnapshotId, 0L).get();){
            Assertions.assertEquals((Object)oldestSnapshotId, (Object)snapshot.snapshotId());
            snapshot.freeze();
        }
        context.client.poll();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset(), oldestSnapshotId.epoch(), 0));
        context.pollUntilResponse();
        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
    }

    @Test
    public void testFetchRequestAtLogStartOffsetWithInvalidEpoch() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        int syncNodeId = otherNodeId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId, syncNodeId});
        OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3L, 2);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("d", "e", "f")).appendToLog(oldestSnapshotId.epoch() + 2, Arrays.asList("g", "h", "i")).withAppendLingerMs(1).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        Assertions.assertEquals((int)(oldestSnapshotId.epoch() + 2 + 1), (int)epoch);
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(oldestSnapshotId, 0L).get();){
            Assertions.assertEquals((Object)oldestSnapshotId, (Object)snapshot.snapshotId());
            snapshot.freeze();
        }
        context.log.deleteBeforeSnapshot(oldestSnapshotId);
        context.client.poll();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset(), oldestSnapshotId.epoch() + 1, 0));
        context.pollUntilResponse();
        FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)partitionResponse.currentLeader().leaderId());
        Assertions.assertEquals((int)oldestSnapshotId.epoch(), (int)partitionResponse.snapshotId().epoch());
        Assertions.assertEquals((long)oldestSnapshotId.offset(), (long)partitionResponse.snapshotId().endOffset());
    }

    @Test
    public void testFetchRequestWithLastFetchedEpochLessThanOldestSnapshot() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        int syncNodeId = otherNodeId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId, syncNodeId});
        OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3L, 2);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")).appendToLog(oldestSnapshotId.epoch(), Arrays.asList("d", "e", "f")).appendToLog(oldestSnapshotId.epoch() + 2, Arrays.asList("g", "h", "i")).withAppendLingerMs(1).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        Assertions.assertEquals((int)(oldestSnapshotId.epoch() + 2 + 1), (int)epoch);
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(oldestSnapshotId, 0L).get();){
            Assertions.assertEquals((Object)oldestSnapshotId, (Object)snapshot.snapshotId());
            snapshot.freeze();
        }
        context.client.poll();
        context.deliverRequest((ApiMessage)context.fetchRequest(epoch, otherNodeId, context.log.endOffset().offset, oldestSnapshotId.epoch() - 1, 0));
        context.pollUntilResponse();
        FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)partitionResponse.errorCode()));
        Assertions.assertEquals((int)epoch, (int)partitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)partitionResponse.currentLeader().leaderId());
        Assertions.assertEquals((int)oldestSnapshotId.epoch(), (int)partitionResponse.snapshotId().epoch());
        Assertions.assertEquals((long)oldestSnapshotId.offset(), (long)partitionResponse.snapshotId().endOffset());
    }

    @Test
    public void testFetchSnapshotRequestMissingSnapshot() throws Exception {
        int localId = 0;
        int epoch = 2;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, localId + 1});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch, new OffsetAndEpoch(0L, 0), Integer.MAX_VALUE, 0L));
        context.client.poll();
        FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.SNAPSHOT_NOT_FOUND, (Object)Errors.forCode((short)response.errorCode()));
    }

    @Test
    public void testFetchSnapshotRequestUnknownPartition() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, localId + 1});
        int epoch = 2;
        TopicPartition topicPartition = new TopicPartition("unknown", 0);
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(topicPartition, epoch, new OffsetAndEpoch(0L, 0), Integer.MAX_VALUE, 0L));
        context.client.poll();
        FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(topicPartition).get();
        Assertions.assertEquals((Object)Errors.UNKNOWN_TOPIC_OR_PARTITION, (Object)Errors.forCode((short)response.errorCode()));
    }

    @Test
    public void testFetchSnapshotRequestAsLeader() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, localId + 1});
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(1L, 1);
        List<String> records = Arrays.asList("foo", "bar");
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), Arrays.asList("a")).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(snapshotId, 0L).get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            snapshot.append(records);
            snapshot.freeze();
        }
        snapshot = context.log.readSnapshot(snapshotId).get();
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch, snapshotId, Integer.MAX_VALUE, 0L));
        context.client.poll();
        FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((long)snapshot.sizeInBytes(), (long)response.size());
        Assertions.assertEquals((long)0L, (long)response.position());
        Assertions.assertEquals((long)snapshot.sizeInBytes(), (long)response.unalignedRecords().sizeInBytes());
        UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords)snapshot.slice(0L, Math.toIntExact(snapshot.sizeInBytes()));
        Assertions.assertEquals((Object)memoryRecords.buffer(), (Object)((UnalignedMemoryRecords)response.unalignedRecords()).buffer());
    }

    @Test
    public void testPartialFetchSnapshotRequestAsLeader() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, localId + 1});
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(2L, 1);
        List<String> records = Arrays.asList("foo", "bar");
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), records).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(snapshotId, 0L).get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            snapshot.append(records);
            snapshot.freeze();
        }
        snapshot = context.log.readSnapshot(snapshotId).get();
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch, snapshotId, Math.toIntExact(snapshot.sizeInBytes() / 2L), 0L));
        context.client.poll();
        FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((long)snapshot.sizeInBytes(), (long)response.size());
        Assertions.assertEquals((long)0L, (long)response.position());
        Assertions.assertEquals((long)(snapshot.sizeInBytes() / 2L), (long)response.unalignedRecords().sizeInBytes());
        UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords)snapshot.slice(0L, Math.toIntExact(snapshot.sizeInBytes()));
        ByteBuffer snapshotBuffer = memoryRecords.buffer();
        ByteBuffer responseBuffer = ByteBuffer.allocate(Math.toIntExact(snapshot.sizeInBytes()));
        responseBuffer.put(((UnalignedMemoryRecords)response.unalignedRecords()).buffer());
        ByteBuffer expectedBytes = snapshotBuffer.duplicate();
        expectedBytes.limit(Math.toIntExact(snapshot.sizeInBytes() / 2L));
        Assertions.assertEquals((Object)expectedBytes, (Object)responseBuffer.duplicate().flip());
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch, snapshotId, Integer.MAX_VALUE, responseBuffer.position()));
        context.client.poll();
        response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((long)snapshot.sizeInBytes(), (long)response.size());
        Assertions.assertEquals((long)responseBuffer.position(), (long)response.position());
        Assertions.assertEquals((long)(snapshot.sizeInBytes() - snapshot.sizeInBytes() / 2L), (long)response.unalignedRecords().sizeInBytes());
        responseBuffer.put(((UnalignedMemoryRecords)response.unalignedRecords()).buffer());
        Assertions.assertEquals((Object)snapshotBuffer, (Object)responseBuffer.flip());
    }

    @Test
    public void testFetchSnapshotRequestAsFollower() throws IOException {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(0L, 0);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch, snapshotId, Integer.MAX_VALUE, 0L));
        context.client.poll();
        FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.NOT_LEADER_OR_FOLLOWER, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((int)epoch, (int)response.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)leaderId, (int)response.currentLeader().leaderId());
    }

    @Test
    public void testFetchSnapshotRequestWithInvalidPosition() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, localId + 1});
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(1L, 1);
        List<String> records = Arrays.asList("foo", "bar");
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(snapshotId.epoch(), Arrays.asList("a")).build();
        context.becomeLeader();
        int epoch = context.currentEpoch();
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        try (SnapshotWriter snapshot = (SnapshotWriter)context.client.createSnapshot(snapshotId, 0L).get();){
            Assertions.assertEquals((Object)snapshotId, (Object)snapshot.snapshotId());
            snapshot.append(records);
            snapshot.freeze();
        }
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch, snapshotId, Integer.MAX_VALUE, -1L));
        context.client.poll();
        FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.POSITION_OUT_OF_RANGE, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((int)epoch, (int)response.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)response.currentLeader().leaderId());
        RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch, snapshotId, Integer.MAX_VALUE, snapshot.sizeInBytes()));
        context.client.poll();
        response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.POSITION_OUT_OF_RANGE, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((int)epoch, (int)response.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)response.currentLeader().leaderId());
    }

    @Test
    public void testFetchSnapshotRequestWithOlderEpoch() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, localId + 1});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(0L, 0);
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch - 1, snapshotId, Integer.MAX_VALUE, 0L));
        context.client.poll();
        FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.FENCED_LEADER_EPOCH, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((int)epoch, (int)response.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)response.currentLeader().leaderId());
    }

    @Test
    public void testFetchSnapshotRequestWithNewerEpoch() throws Exception {
        int localId = 0;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, localId + 1});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(0L, 0);
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.metadataPartition, epoch + 1, snapshotId, Integer.MAX_VALUE, 0L));
        context.client.poll();
        FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
        Assertions.assertEquals((Object)Errors.UNKNOWN_LEADER_EPOCH, (Object)Errors.forCode((short)response.errorCode()));
        Assertions.assertEquals((int)epoch, (int)response.currentLeader().leaderEpoch());
        Assertions.assertEquals((int)localId, (int)response.currentLeader().leaderId());
    }

    @Test
    public void testFetchResponseWithInvalidSnapshotId() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch invalidEpoch = new OffsetAndEpoch(100L, -1);
        OffsetAndEpoch invalidEndOffset = new OffsetAndEpoch(-1L, 1);
        int slept = 0;
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, invalidEpoch, 200L));
        context.client.poll();
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(50L);
        slept += context.retryBackoffMs;
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, invalidEndOffset, 200L));
        context.client.poll();
        MockTime mockTime2 = context.time;
        Objects.requireNonNull(context);
        mockTime2.sleep(50L);
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.time.sleep((long)(context.fetchTimeoutMs - (slept += context.retryBackoffMs)));
        context.pollUntilRequest();
        context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
        context.assertVotedCandidate(epoch + 1, localId);
    }

    @Test
    public void testFetchResponseWithSnapshotId() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L));
        context.pollUntilRequest();
        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
        FetchSnapshotRequestData.PartitionSnapshot request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        List<String> records = Arrays.asList("foo", "bar");
        MemorySnapshotWriter memorySnapshot = new MemorySnapshotWriter(snapshotId);
        try (SnapshotWriter<String> snapshotWriter = KafkaRaftClientSnapshotTest.snapshotWriter(context, memorySnapshot);){
            snapshotWriter.append(records);
            snapshotWriter.freeze();
        }
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotResponse(context.metadataPartition, epoch, leaderId, snapshotId, memorySnapshot.buffer().remaining(), 0L, memorySnapshot.buffer().slice()));
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset(), snapshotId.epoch());
        RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
        Assertions.assertEquals((long)memorySnapshot.buffer().remaining(), (long)snapshot.sizeInBytes());
        SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
        try (SnapshotReader<String> reader = context.listener.drainHandledSnapshot().get();){
            Assertions.assertEquals((Object)snapshotId, (Object)reader.snapshotId());
            SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader);
        }
    }

    @Test
    public void testFetchSnapshotResponsePartialData() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L));
        context.pollUntilRequest();
        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
        FetchSnapshotRequestData.PartitionSnapshot request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        List<String> records = Arrays.asList("foo", "bar");
        MemorySnapshotWriter memorySnapshot = new MemorySnapshotWriter(snapshotId);
        try (SnapshotWriter<String> snapshotWriter = KafkaRaftClientSnapshotTest.snapshotWriter(context, memorySnapshot);){
            snapshotWriter.append(records);
            snapshotWriter.freeze();
        }
        ByteBuffer sendingBuffer = memorySnapshot.buffer().slice();
        sendingBuffer.limit(sendingBuffer.limit() / 2);
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotResponse(context.metadataPartition, epoch, leaderId, snapshotId, memorySnapshot.buffer().remaining(), 0L, sendingBuffer));
        context.pollUntilRequest();
        snapshotRequest = context.assertSentFetchSnapshotRequest();
        request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)sendingBuffer.limit(), (long)request.position());
        sendingBuffer = memorySnapshot.buffer().slice();
        sendingBuffer.position(Math.toIntExact(request.position()));
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotResponse(context.metadataPartition, epoch, leaderId, snapshotId, memorySnapshot.buffer().remaining(), request.position(), sendingBuffer));
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset(), snapshotId.epoch());
        RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
        Assertions.assertEquals((long)memorySnapshot.buffer().remaining(), (long)snapshot.sizeInBytes());
        SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
        try (SnapshotReader<String> reader = context.listener.drainHandledSnapshot().get();){
            Assertions.assertEquals((Object)snapshotId, (Object)reader.snapshotId());
            SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader);
        }
    }

    @Test
    public void testFetchSnapshotResponseMissingSnapshot() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L));
        context.pollUntilRequest();
        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
        FetchSnapshotRequestData.PartitionSnapshot request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)FetchSnapshotResponse.singleton((TopicPartition)context.metadataPartition, responsePartitionSnapshot -> {
            responsePartitionSnapshot.currentLeader().setLeaderEpoch(epoch).setLeaderId(leaderId);
            return responsePartitionSnapshot.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
        }));
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
    }

    @Test
    public void testFetchSnapshotResponseFromNewerEpochNotLeader() throws Exception {
        int localId = 0;
        int firstLeaderId = localId + 1;
        int secondLeaderId = firstLeaderId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, firstLeaderId, secondLeaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, firstLeaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, firstLeaderId, snapshotId, 200L));
        context.pollUntilRequest();
        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
        FetchSnapshotRequestData.PartitionSnapshot request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)FetchSnapshotResponse.singleton((TopicPartition)context.metadataPartition, responsePartitionSnapshot -> {
            responsePartitionSnapshot.currentLeader().setLeaderEpoch(epoch + 1).setLeaderId(secondLeaderId);
            return responsePartitionSnapshot.setErrorCode(Errors.FENCED_LEADER_EPOCH.code());
        }));
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
    }

    @Test
    public void testFetchSnapshotResponseFromNewerEpochLeader() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L));
        context.pollUntilRequest();
        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
        FetchSnapshotRequestData.PartitionSnapshot request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)FetchSnapshotResponse.singleton((TopicPartition)context.metadataPartition, responsePartitionSnapshot -> {
            responsePartitionSnapshot.currentLeader().setLeaderEpoch(epoch + 1).setLeaderId(leaderId);
            return responsePartitionSnapshot.setErrorCode(Errors.FENCED_LEADER_EPOCH.code());
        }));
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
    }

    @Test
    public void testFetchSnapshotResponseFromOlderEpoch() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L));
        context.pollUntilRequest();
        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
        FetchSnapshotRequestData.PartitionSnapshot request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)FetchSnapshotResponse.singleton((TopicPartition)context.metadataPartition, responsePartitionSnapshot -> {
            responsePartitionSnapshot.currentLeader().setLeaderEpoch(epoch - 1).setLeaderId(leaderId + 1);
            return responsePartitionSnapshot.setErrorCode(Errors.UNKNOWN_LEADER_EPOCH.code());
        }));
        context.pollUntilRequest();
        snapshotRequest = context.assertSentFetchSnapshotRequest();
        request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
    }

    @Test
    public void testFetchSnapshotResponseWithInvalidId() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L));
        context.pollUntilRequest();
        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
        FetchSnapshotRequestData.PartitionSnapshot request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)FetchSnapshotResponse.singleton((TopicPartition)context.metadataPartition, responsePartitionSnapshot -> {
            responsePartitionSnapshot.currentLeader().setLeaderEpoch(epoch).setLeaderId(leaderId);
            responsePartitionSnapshot.snapshotId().setEndOffset(-1L).setEpoch(snapshotId.epoch());
            return responsePartitionSnapshot;
        }));
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L));
        context.pollUntilRequest();
        snapshotRequest = context.assertSentFetchSnapshotRequest();
        request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)FetchSnapshotResponse.singleton((TopicPartition)context.metadataPartition, responsePartitionSnapshot -> {
            responsePartitionSnapshot.currentLeader().setLeaderEpoch(epoch).setLeaderId(leaderId);
            responsePartitionSnapshot.snapshotId().setEndOffset(snapshotId.offset()).setEpoch(-1);
            return responsePartitionSnapshot;
        }));
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
    }

    @Test
    public void testFetchSnapshotResponseToNotFollower() throws Exception {
        int localId = 0;
        int leaderId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId});
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)KafkaRaftClientSnapshotTest.snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L));
        context.pollUntilRequest();
        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
        FetchSnapshotRequestData.PartitionSnapshot request = KafkaRaftClientSnapshotTest.assertFetchSnapshotRequest(snapshotRequest, context.metadataPartition, localId, Integer.MAX_VALUE).get();
        Assertions.assertEquals((long)snapshotId.offset(), (long)request.snapshotId().endOffset());
        Assertions.assertEquals((int)snapshotId.epoch(), (int)request.snapshotId().epoch());
        Assertions.assertEquals((long)0L, (long)request.position());
        MockTime mockTime = context.time;
        Objects.requireNonNull(context);
        mockTime.sleep(50000L);
        context.pollUntilRequest();
        context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
        context.assertVotedCandidate(epoch + 1, localId);
        context.deliverResponse(snapshotRequest.correlationId, snapshotRequest.destinationId(), (ApiMessage)FetchSnapshotResponse.singleton((TopicPartition)context.metadataPartition, responsePartitionSnapshot -> {
            responsePartitionSnapshot.currentLeader().setLeaderEpoch(epoch).setLeaderId(leaderId);
            responsePartitionSnapshot.snapshotId().setEndOffset(snapshotId.offset()).setEpoch(snapshotId.epoch());
            return responsePartitionSnapshot;
        }));
        context.client.poll();
        context.assertVotedCandidate(epoch + 1, localId);
    }

    @Test
    public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
        int localId = 0;
        int otherNodeId = 1;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(context.clusterId.toString(), context.metadataPartition, epoch, new OffsetAndEpoch(0L, 0), Integer.MAX_VALUE, 0L));
        context.pollUntilResponse();
        context.assertSentFetchSnapshotResponse(context.metadataPartition);
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest(null, context.metadataPartition, epoch, new OffsetAndEpoch(0L, 0), Integer.MAX_VALUE, 0L));
        context.pollUntilResponse();
        context.assertSentFetchSnapshotResponse(context.metadataPartition);
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest("", context.metadataPartition, epoch, new OffsetAndEpoch(0L, 0), Integer.MAX_VALUE, 0L));
        context.pollUntilResponse();
        context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
        context.deliverRequest((ApiMessage)KafkaRaftClientSnapshotTest.fetchSnapshotRequest("invalid-uuid", context.metadataPartition, epoch, new OffsetAndEpoch(0L, 0), Integer.MAX_VALUE, 0L));
        context.pollUntilResponse();
        context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @Test
    public void testCreateSnapshotAsLeaderWithInvalidSnapshotId() throws Exception {
        int localId = 0;
        int otherNodeId = localId + 1;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, otherNodeId});
        int epoch = 2;
        List<String> appendRecords = Arrays.asList("a", "b", "c");
        OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(4L, epoch);
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).appendToLog(epoch, appendRecords).withAppendLingerMs(1).build();
        context.becomeLeader();
        int currentEpoch = context.currentEpoch();
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.client.highWatermark());
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1, 0L));
        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
        List<String> newRecords = Arrays.asList("d", "e", "f");
        context.client.scheduleAppend(currentEpoch, newRecords);
        context.time.sleep((long)context.appendLingerMs());
        context.client.poll();
        Assertions.assertEquals((long)context.log.endOffset().offset, (long)(context.client.highWatermark().getAsLong() + (long)newRecords.size()));
        OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2L, currentEpoch);
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2, 0L));
        OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3, 0L));
        OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(epoch);
        Assertions.assertEquals((int)epoch, (int)endOffsetForEpoch.epoch());
        OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset() + 2L, epoch);
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4, 0L));
    }

    @Test
    public void testCreateSnapshotAsFollowerWithInvalidSnapshotId() throws Exception {
        int localId = 0;
        int leaderId = 1;
        int otherFollowerId = 2;
        int epoch = 5;
        Set voters = Utils.mkSet((Object[])new Integer[]{localId, leaderId, otherFollowerId});
        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, (Set<Integer>)voters).withElectedLeader(epoch, leaderId).build();
        context.assertElectedLeader(epoch, leaderId);
        Assertions.assertEquals((Object)OptionalLong.empty(), (Object)context.client.highWatermark());
        OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(1L, 0);
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1, 0L));
        context.pollUntilRequest();
        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
        List<String> records1 = Arrays.asList("a", "b", "c");
        MemoryRecords batch1 = context.buildBatch(0L, 3, records1);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)batch1, 0L, Errors.NONE));
        context.client.poll();
        int currentEpoch = context.currentEpoch();
        OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1L, currentEpoch);
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2, 0L));
        OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1L, currentEpoch + 1);
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3, 0L));
        context.pollUntilRequest();
        fetchRequest = context.assertSentFetchRequest();
        Assertions.assertTrue((boolean)voters.contains(fetchRequest.destinationId()));
        context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
        List<String> records2 = Arrays.asList("d", "e", "f");
        MemoryRecords batch2 = context.buildBatch(3L, 4, records2);
        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), (ApiMessage)context.fetchResponse(epoch, leaderId, (Records)batch2, 6L, Errors.NONE));
        context.client.poll();
        Assertions.assertEquals((long)6L, (long)context.client.highWatermark().getAsLong());
        OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(3);
        Assertions.assertEquals((int)3, (int)endOffsetForEpoch.epoch());
        OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset() + 1L, epoch);
        Assertions.assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4, 0L));
    }

    private static FetchSnapshotRequestData fetchSnapshotRequest(TopicPartition topicPartition, int epoch, OffsetAndEpoch offsetAndEpoch, int maxBytes, long position) {
        return KafkaRaftClientSnapshotTest.fetchSnapshotRequest(null, topicPartition, epoch, offsetAndEpoch, maxBytes, position);
    }

    private static FetchSnapshotRequestData fetchSnapshotRequest(String clusterId, TopicPartition topicPartition, int epoch, OffsetAndEpoch offsetAndEpoch, int maxBytes, long position) {
        FetchSnapshotRequestData.SnapshotId snapshotId = new FetchSnapshotRequestData.SnapshotId().setEndOffset(offsetAndEpoch.offset()).setEpoch(offsetAndEpoch.epoch());
        FetchSnapshotRequestData request = FetchSnapshotRequest.singleton((String)clusterId, (TopicPartition)topicPartition, snapshotPartition -> snapshotPartition.setCurrentLeaderEpoch(epoch).setSnapshotId(snapshotId).setPosition(position));
        return request.setMaxBytes(maxBytes);
    }

    private static FetchSnapshotResponseData fetchSnapshotResponse(TopicPartition topicPartition, int leaderEpoch, int leaderId, OffsetAndEpoch snapshotId, long size, long position, ByteBuffer buffer) {
        return FetchSnapshotResponse.singleton((TopicPartition)topicPartition, partitionSnapshot -> {
            partitionSnapshot.currentLeader().setLeaderEpoch(leaderEpoch).setLeaderId(leaderId);
            partitionSnapshot.snapshotId().setEndOffset(snapshotId.offset()).setEpoch(snapshotId.epoch());
            return partitionSnapshot.setSize(size).setPosition(position).setUnalignedRecords((BaseRecords)MemoryRecords.readableRecords((ByteBuffer)buffer.slice()));
        });
    }

    private static FetchResponseData snapshotFetchResponse(TopicPartition topicPartition, Uuid topicId, int epoch, int leaderId, OffsetAndEpoch snapshotId, long highWatermark) {
        return RaftUtil.singletonFetchResponse((TopicPartition)topicPartition, (Uuid)topicId, (Errors)Errors.NONE, partitionData -> {
            partitionData.setHighWatermark(highWatermark);
            partitionData.currentLeader().setLeaderEpoch(epoch).setLeaderId(leaderId);
            partitionData.snapshotId().setEpoch(snapshotId.epoch()).setEndOffset(snapshotId.offset());
        });
    }

    private static Optional<FetchSnapshotRequestData.PartitionSnapshot> assertFetchSnapshotRequest(RaftRequest.Outbound request, TopicPartition topicPartition, int replicaId, int maxBytes) {
        Assertions.assertTrue((boolean)(request.data() instanceof FetchSnapshotRequestData));
        FetchSnapshotRequestData data = (FetchSnapshotRequestData)request.data();
        Assertions.assertEquals((int)replicaId, (int)data.replicaId());
        Assertions.assertEquals((int)maxBytes, (int)data.maxBytes());
        return FetchSnapshotRequest.forTopicPartition((FetchSnapshotRequestData)data, (TopicPartition)topicPartition);
    }

    private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) {
        return RecordsSnapshotWriter.createWithHeader((RawSnapshotWriter)snapshot, (int)4096, (MemoryPool)MemoryPool.NONE, (Time)context.time, (long)0L, (CompressionType)CompressionType.NONE, (RecordSerde)new StringSerde());
    }

    private static final class MemorySnapshotWriter
    implements RawSnapshotWriter {
        private final OffsetAndEpoch snapshotId;
        private ByteBuffer data;
        private AtomicLong frozenPosition;

        public MemorySnapshotWriter(OffsetAndEpoch snapshotId) {
            this.snapshotId = snapshotId;
            this.data = ByteBuffer.allocate(0);
            this.frozenPosition = new AtomicLong(-1L);
        }

        public OffsetAndEpoch snapshotId() {
            return this.snapshotId;
        }

        public long sizeInBytes() {
            long position = this.frozenPosition.get();
            return position < 0L ? (long)this.data.position() : position;
        }

        public void append(UnalignedMemoryRecords records) {
            if (this.isFrozen()) {
                throw new RuntimeException("Snapshot is already frozen " + this.snapshotId);
            }
            this.append(records.buffer());
        }

        public void append(MemoryRecords records) {
            if (this.isFrozen()) {
                throw new RuntimeException("Snapshot is already frozen " + this.snapshotId);
            }
            this.append(records.buffer());
        }

        private void append(ByteBuffer buffer) {
            if (this.data.remaining() < buffer.remaining()) {
                ByteBuffer old = this.data;
                old.flip();
                int newSize = Math.max(this.data.capacity() * 2, this.data.capacity() + buffer.remaining());
                this.data = ByteBuffer.allocate(newSize);
                this.data.put(old);
            }
            this.data.put(buffer);
        }

        public boolean isFrozen() {
            return this.frozenPosition.get() >= 0L;
        }

        public void freeze() {
            if (!this.frozenPosition.compareAndSet(-1L, this.data.position())) {
                throw new RuntimeException("Snapshot is already frozen " + this.snapshotId);
            }
            this.data.flip();
        }

        public void close() {
        }

        public ByteBuffer buffer() {
            return this.data;
        }
    }
}

