/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.LogFetchInfo;
import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.MockLog;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.ValidOffsetAndEpoch;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class MockLogTest {
    private MockLog log;
    private final TopicPartition topicPartition = new TopicPartition("mock-topic", 0);
    private final Uuid topicId = Uuid.randomUuid();

    @BeforeEach
    public void setup() {
        this.log = new MockLog(this.topicPartition, this.topicId, new LogContext());
    }

    @AfterEach
    public void cleanup() {
        this.log.close();
    }

    @Test
    public void testTopicPartition() {
        Assertions.assertEquals((Object)this.topicPartition, (Object)this.log.topicPartition());
    }

    @Test
    public void testTopicId() {
        Assertions.assertEquals((Object)this.topicId, (Object)this.log.topicId());
    }

    @Test
    public void testTruncateTo() {
        int epoch = 2;
        SimpleRecord recordOne = new SimpleRecord("one".getBytes());
        SimpleRecord recordTwo = new SimpleRecord("two".getBytes());
        this.appendAsLeader(Arrays.asList(recordOne, recordTwo), epoch);
        SimpleRecord recordThree = new SimpleRecord("three".getBytes());
        this.appendAsLeader(Collections.singleton(recordThree), epoch);
        Assertions.assertEquals((long)0L, (long)this.log.startOffset());
        Assertions.assertEquals((long)3L, (long)this.log.endOffset().offset);
        this.log.truncateTo(2L);
        Assertions.assertEquals((long)0L, (long)this.log.startOffset());
        Assertions.assertEquals((long)2L, (long)this.log.endOffset().offset);
        this.log.truncateTo(1L);
        Assertions.assertEquals((long)0L, (long)this.log.startOffset());
        Assertions.assertEquals((long)0L, (long)this.log.endOffset().offset);
    }

    @Test
    public void testTruncateBelowHighWatermark() {
        this.appendBatch(5, 1);
        LogOffsetMetadata highWatermark = new LogOffsetMetadata(5L);
        this.log.updateHighWatermark(highWatermark);
        Assertions.assertEquals((Object)highWatermark, (Object)this.log.highWatermark());
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.truncateTo(4L));
        Assertions.assertEquals((Object)highWatermark, (Object)this.log.highWatermark());
    }

    @Test
    public void testUpdateHighWatermark() {
        this.appendBatch(5, 1);
        LogOffsetMetadata newOffset = new LogOffsetMetadata(5L);
        this.log.updateHighWatermark(newOffset);
        Assertions.assertEquals((long)newOffset.offset, (long)this.log.highWatermark().offset);
    }

    @Test
    public void testDecrementHighWatermark() {
        this.appendBatch(5, 1);
        LogOffsetMetadata newOffset = new LogOffsetMetadata(4L);
        this.log.updateHighWatermark(newOffset);
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.updateHighWatermark(new LogOffsetMetadata(3L)));
    }

    @Test
    public void testAssignEpochStartOffset() {
        this.log.initializeLeaderEpoch(2);
        Assertions.assertEquals((int)2, (int)this.log.lastFetchedEpoch());
    }

    @Test
    public void testAppendAsLeader() {
        int epoch = 2;
        SimpleRecord recordOne = new SimpleRecord("one".getBytes());
        ArrayList<SimpleRecord> expectedRecords = new ArrayList<SimpleRecord>();
        expectedRecords.add(recordOne);
        this.appendAsLeader(Collections.singleton(recordOne), epoch);
        Assertions.assertEquals((Object)new OffsetAndEpoch((long)expectedRecords.size(), epoch), (Object)this.log.endOffsetForEpoch(epoch));
        Assertions.assertEquals((int)epoch, (int)this.log.lastFetchedEpoch());
        MockLogTest.validateReadRecords(expectedRecords, this.log);
        SimpleRecord recordTwo = new SimpleRecord("two".getBytes());
        SimpleRecord recordThree = new SimpleRecord("three".getBytes());
        expectedRecords.add(recordTwo);
        expectedRecords.add(recordThree);
        this.appendAsLeader(Arrays.asList(recordTwo, recordThree), epoch);
        Assertions.assertEquals((Object)new OffsetAndEpoch((long)expectedRecords.size(), epoch), (Object)this.log.endOffsetForEpoch(epoch));
        Assertions.assertEquals((int)epoch, (int)this.log.lastFetchedEpoch());
        MockLogTest.validateReadRecords(expectedRecords, this.log);
    }

    @Test
    public void testUnexpectedAppendOffset() {
        SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
        int currentEpoch = 3;
        long initialOffset = this.log.endOffset().offset;
        this.log.appendAsLeader((Records)MemoryRecords.withRecords((long)initialOffset, (CompressionType)CompressionType.NONE, (Integer)3, (SimpleRecord[])new SimpleRecord[]{recordFoo}), 3);
        Assertions.assertThrows(RuntimeException.class, () -> this.log.appendAsLeader((Records)MemoryRecords.withRecords((long)initialOffset, (CompressionType)CompressionType.NONE, (Integer)3, (SimpleRecord[])new SimpleRecord[]{recordFoo}), 3));
        Assertions.assertThrows(RuntimeException.class, () -> this.log.appendAsFollower((Records)MemoryRecords.withRecords((long)initialOffset, (CompressionType)CompressionType.NONE, (Integer)3, (SimpleRecord[])new SimpleRecord[]{recordFoo})));
    }

    @Test
    public void testAppendControlRecord() {
        long initialOffset = 0L;
        int currentEpoch = 3;
        LeaderChangeMessage messageData = new LeaderChangeMessage().setLeaderId(0);
        ByteBuffer buffer = ByteBuffer.allocate(256);
        this.log.appendAsLeader((Records)MemoryRecords.withLeaderChangeMessage((long)0L, (long)0L, (int)2, (ByteBuffer)buffer, (LeaderChangeMessage)messageData), 3);
        Assertions.assertEquals((long)0L, (long)this.log.startOffset());
        Assertions.assertEquals((long)1L, (long)this.log.endOffset().offset);
        Assertions.assertEquals((int)3, (int)this.log.lastFetchedEpoch());
        Records records = this.log.read((long)0L, (Isolation)Isolation.UNCOMMITTED).records;
        for (RecordBatch batch : records.batches()) {
            Assertions.assertTrue((boolean)batch.isControlBatch());
        }
        ArrayList<ByteBuffer> extractRecords = new ArrayList<ByteBuffer>();
        for (Record record : records.records()) {
            LeaderChangeMessage deserializedData = ControlRecordUtils.deserializeLeaderChangeMessage((Record)record);
            Assertions.assertEquals((Object)deserializedData, (Object)messageData);
            extractRecords.add(record.value());
        }
        Assertions.assertEquals((int)1, (int)extractRecords.size());
        Assertions.assertEquals((Object)new OffsetAndEpoch(1L, 3), (Object)this.log.endOffsetForEpoch(3));
    }

    @Test
    public void testAppendAsFollower() throws IOException {
        long initialOffset = 5L;
        int epoch = 3;
        SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(new OffsetAndEpoch(5L, 0)).get();){
            snapshot.freeze();
        }
        this.log.truncateToLatestSnapshot();
        this.log.appendAsFollower((Records)MemoryRecords.withRecords((long)5L, (CompressionType)CompressionType.NONE, (Integer)3, (SimpleRecord[])new SimpleRecord[]{recordFoo}));
        Assertions.assertEquals((long)5L, (long)this.log.startOffset());
        Assertions.assertEquals((long)6L, (long)this.log.endOffset().offset);
        Assertions.assertEquals((int)3, (int)this.log.lastFetchedEpoch());
        Records records = this.log.read((long)5L, (Isolation)Isolation.UNCOMMITTED).records;
        ArrayList<ByteBuffer> extractRecords = new ArrayList<ByteBuffer>();
        for (Record record : records.records()) {
            extractRecords.add(record.value());
        }
        Assertions.assertEquals((int)1, (int)extractRecords.size());
        Assertions.assertEquals((Object)recordFoo.value(), extractRecords.get(0));
        Assertions.assertEquals((Object)new OffsetAndEpoch(5L, 0), (Object)this.log.endOffsetForEpoch(0));
        Assertions.assertEquals((Object)new OffsetAndEpoch(this.log.endOffset().offset, 3), (Object)this.log.endOffsetForEpoch(3));
    }

    @Test
    public void testReadRecords() {
        int epoch = 2;
        ByteBuffer recordOneBuffer = ByteBuffer.allocate(4);
        recordOneBuffer.putInt(1);
        SimpleRecord recordOne = new SimpleRecord(recordOneBuffer);
        ByteBuffer recordTwoBuffer = ByteBuffer.allocate(4);
        recordTwoBuffer.putInt(2);
        SimpleRecord recordTwo = new SimpleRecord(recordTwoBuffer);
        this.appendAsLeader(Arrays.asList(recordOne, recordTwo), epoch);
        Records records = this.log.read((long)0L, (Isolation)Isolation.UNCOMMITTED).records;
        ArrayList<ByteBuffer> extractRecords = new ArrayList<ByteBuffer>();
        for (Record record : records.records()) {
            extractRecords.add(record.value());
        }
        Assertions.assertEquals(Arrays.asList(recordOne.value(), recordTwo.value()), extractRecords);
    }

    @Test
    public void testReadUpToLogEnd() {
        this.appendBatch(20, 1);
        this.appendBatch(10, 1);
        this.appendBatch(30, 1);
        Assertions.assertEquals(Optional.of(new OffsetRange(0L, 59L)), this.readOffsets(0L, Isolation.UNCOMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(0L, 59L)), this.readOffsets(10L, Isolation.UNCOMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(20L, 59L)), this.readOffsets(20L, Isolation.UNCOMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(20L, 59L)), this.readOffsets(25L, Isolation.UNCOMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(30L, 59L)), this.readOffsets(30L, Isolation.UNCOMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(30L, 59L)), this.readOffsets(33L, Isolation.UNCOMMITTED));
        Assertions.assertEquals(Optional.empty(), this.readOffsets(60L, Isolation.UNCOMMITTED));
        Assertions.assertThrows(OffsetOutOfRangeException.class, () -> this.log.read(61L, Isolation.UNCOMMITTED));
        this.log.truncateTo(20L);
        Assertions.assertThrows(OffsetOutOfRangeException.class, () -> this.log.read(21L, Isolation.UNCOMMITTED));
    }

    @Test
    public void testReadUpToHighWatermark() {
        this.appendBatch(20, 1);
        this.appendBatch(10, 1);
        this.appendBatch(30, 1);
        this.log.updateHighWatermark(new LogOffsetMetadata(0L));
        Assertions.assertEquals(Optional.empty(), this.readOffsets(0L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.empty(), this.readOffsets(10L, Isolation.COMMITTED));
        this.log.updateHighWatermark(new LogOffsetMetadata(20L));
        Assertions.assertEquals(Optional.of(new OffsetRange(0L, 19L)), this.readOffsets(0L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(0L, 19L)), this.readOffsets(10L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.empty(), this.readOffsets(20L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.empty(), this.readOffsets(30L, Isolation.COMMITTED));
        this.log.updateHighWatermark(new LogOffsetMetadata(30L));
        Assertions.assertEquals(Optional.of(new OffsetRange(0L, 29L)), this.readOffsets(0L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(0L, 29L)), this.readOffsets(10L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(20L, 29L)), this.readOffsets(20L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(20L, 29L)), this.readOffsets(25L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.empty(), this.readOffsets(30L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.empty(), this.readOffsets(50L, Isolation.COMMITTED));
        this.log.updateHighWatermark(new LogOffsetMetadata(60L));
        Assertions.assertEquals(Optional.of(new OffsetRange(0L, 59L)), this.readOffsets(0L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(0L, 59L)), this.readOffsets(10L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(20L, 59L)), this.readOffsets(20L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(20L, 59L)), this.readOffsets(25L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(30L, 59L)), this.readOffsets(30L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.of(new OffsetRange(30L, 59L)), this.readOffsets(50L, Isolation.COMMITTED));
        Assertions.assertEquals(Optional.empty(), this.readOffsets(60L, Isolation.COMMITTED));
        Assertions.assertThrows(OffsetOutOfRangeException.class, () -> this.log.read(61L, Isolation.COMMITTED));
    }

    @Test
    public void testMetadataValidation() {
        this.appendBatch(5, 1);
        this.appendBatch(5, 1);
        this.appendBatch(5, 1);
        LogFetchInfo readInfo = this.log.read(5L, Isolation.UNCOMMITTED);
        Assertions.assertEquals((long)5L, (long)readInfo.startOffsetMetadata.offset);
        Assertions.assertTrue((boolean)readInfo.startOffsetMetadata.metadata.isPresent());
        MockLog.MockOffsetMetadata offsetMetadata = (MockLog.MockOffsetMetadata)readInfo.startOffsetMetadata.metadata.get();
        this.log.updateHighWatermark(readInfo.startOffsetMetadata);
        Assertions.assertEquals((long)readInfo.startOffsetMetadata.offset, (long)this.log.highWatermark().offset);
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.updateHighWatermark(new LogOffsetMetadata(10L, Optional.of(new MockLog.MockOffsetMetadata(98230980L)))));
        LogFetchInfo readFromEndInfo = this.log.read(15L, Isolation.UNCOMMITTED);
        Assertions.assertEquals((long)15L, (long)readFromEndInfo.startOffsetMetadata.offset);
        Assertions.assertTrue((boolean)readFromEndInfo.startOffsetMetadata.metadata.isPresent());
        this.log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
        this.appendBatch(5, 1);
        this.log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
        LogFetchInfo readFromMiddleInfo = this.log.read(16L, Isolation.UNCOMMITTED);
        Assertions.assertEquals((Object)readFromEndInfo.startOffsetMetadata, (Object)readFromMiddleInfo.startOffsetMetadata);
    }

    @Test
    public void testEndOffsetForEpoch() {
        this.appendBatch(5, 1);
        this.appendBatch(10, 1);
        this.appendBatch(5, 3);
        this.appendBatch(10, 4);
        Assertions.assertEquals((Object)new OffsetAndEpoch(0L, 0), (Object)this.log.endOffsetForEpoch(0));
        Assertions.assertEquals((Object)new OffsetAndEpoch(15L, 1), (Object)this.log.endOffsetForEpoch(1));
        Assertions.assertEquals((Object)new OffsetAndEpoch(15L, 1), (Object)this.log.endOffsetForEpoch(2));
        Assertions.assertEquals((Object)new OffsetAndEpoch(20L, 3), (Object)this.log.endOffsetForEpoch(3));
        Assertions.assertEquals((Object)new OffsetAndEpoch(30L, 4), (Object)this.log.endOffsetForEpoch(4));
        Assertions.assertEquals((Object)new OffsetAndEpoch(30L, 4), (Object)this.log.endOffsetForEpoch(5));
    }

    @Test
    public void testEmptyAppendNotAllowed() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.appendAsFollower((Records)MemoryRecords.EMPTY));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.appendAsLeader((Records)MemoryRecords.EMPTY, 1));
    }

    @Test
    public void testReadOutOfRangeOffset() throws IOException {
        long initialOffset = 5L;
        int epoch = 3;
        SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(new OffsetAndEpoch(5L, 0)).get();){
            snapshot.freeze();
        }
        this.log.truncateToLatestSnapshot();
        this.log.appendAsFollower((Records)MemoryRecords.withRecords((long)5L, (CompressionType)CompressionType.NONE, (Integer)3, (SimpleRecord[])new SimpleRecord[]{recordFoo}));
        Assertions.assertThrows(OffsetOutOfRangeException.class, () -> this.log.read(this.log.startOffset() - 1L, Isolation.UNCOMMITTED));
        Assertions.assertThrows(OffsetOutOfRangeException.class, () -> this.log.read(this.log.endOffset().offset + 1L, Isolation.UNCOMMITTED));
    }

    @Test
    public void testMonotonicEpochStartOffset() {
        this.appendBatch(5, 1);
        Assertions.assertEquals((long)5L, (long)this.log.endOffset().offset);
        this.log.initializeLeaderEpoch(2);
        Assertions.assertEquals((Object)new OffsetAndEpoch(5L, 1), (Object)this.log.endOffsetForEpoch(1));
        Assertions.assertEquals((Object)new OffsetAndEpoch(5L, 2), (Object)this.log.endOffsetForEpoch(2));
        this.log.initializeLeaderEpoch(3);
        Assertions.assertEquals((Object)new OffsetAndEpoch(5L, 1), (Object)this.log.endOffsetForEpoch(1));
        Assertions.assertEquals((Object)new OffsetAndEpoch(5L, 1), (Object)this.log.endOffsetForEpoch(2));
        Assertions.assertEquals((Object)new OffsetAndEpoch(5L, 3), (Object)this.log.endOffsetForEpoch(3));
    }

    @Test
    public void testUnflushedRecordsLostAfterReopen() {
        this.appendBatch(5, 1);
        this.appendBatch(10, 2);
        this.log.flush(false);
        this.appendBatch(5, 3);
        this.appendBatch(10, 4);
        this.log.reopen();
        Assertions.assertEquals((long)15L, (long)this.log.endOffset().offset);
        Assertions.assertEquals((int)2, (int)this.log.lastFetchedEpoch());
    }

    @Test
    public void testCreateSnapshot() throws IOException {
        int numberOfRecords = 10;
        int epoch = 0;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch((long)numberOfRecords, epoch);
        this.appendBatch(numberOfRecords, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)numberOfRecords));
        try (RawSnapshotWriter snapshot = this.log.createNewSnapshot(snapshotId).get();){
            snapshot.freeze();
        }
        snapshot = this.log.readSnapshot(snapshotId).get();
        Assertions.assertEquals((long)0L, (long)snapshot.sizeInBytes());
    }

    @Test
    public void testCreateSnapshotValidation() {
        int numberOfRecords = 10;
        int firstEpoch = 1;
        int secondEpoch = 3;
        this.appendBatch(numberOfRecords, firstEpoch);
        this.appendBatch(numberOfRecords, secondEpoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)(2 * numberOfRecords)));
        this.log.createNewSnapshot(new OffsetAndEpoch((long)numberOfRecords, firstEpoch)).get().close();
        this.log.createNewSnapshot(new OffsetAndEpoch((long)(numberOfRecords - 1), firstEpoch)).get().close();
        this.log.createNewSnapshot(new OffsetAndEpoch(1L, firstEpoch)).get().close();
        this.log.createNewSnapshot(new OffsetAndEpoch((long)(2 * numberOfRecords), secondEpoch)).get().close();
        this.log.createNewSnapshot(new OffsetAndEpoch((long)(2 * numberOfRecords - 1), secondEpoch)).get().close();
        this.log.createNewSnapshot(new OffsetAndEpoch((long)(numberOfRecords + 1), secondEpoch)).get().close();
    }

    @Test
    public void testCreateSnapshotLaterThanHighWatermark() {
        int numberOfRecords = 10;
        int epoch = 1;
        this.appendBatch(numberOfRecords, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)numberOfRecords));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.createNewSnapshot(new OffsetAndEpoch((long)(numberOfRecords + 1), epoch)));
    }

    @Test
    public void testCreateSnapshotMuchLaterEpoch() {
        int numberOfRecords = 10;
        int epoch = 1;
        this.appendBatch(numberOfRecords, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)numberOfRecords));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.createNewSnapshot(new OffsetAndEpoch((long)numberOfRecords, epoch + 1)));
    }

    @Test
    public void testCreateSnapshotBeforeLogStartOffset() {
        int numberOfRecords = 10;
        int epoch = 1;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch((long)numberOfRecords, epoch);
        this.appendBatch(numberOfRecords, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)numberOfRecords));
        try (RawSnapshotWriter snapshot = this.log.createNewSnapshot(snapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertTrue((boolean)this.log.deleteBeforeSnapshot(snapshotId));
        Assertions.assertEquals((long)snapshotId.offset(), (long)this.log.startOffset());
        Assertions.assertEquals(Optional.empty(), this.log.createNewSnapshot(new OffsetAndEpoch((long)(numberOfRecords - 1), epoch)));
    }

    @Test
    public void testCreateSnapshotMuchEalierEpoch() {
        int numberOfRecords = 10;
        int epoch = 2;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch((long)numberOfRecords, epoch);
        this.appendBatch(numberOfRecords, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)numberOfRecords));
        try (RawSnapshotWriter snapshot = this.log.createNewSnapshot(snapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertTrue((boolean)this.log.deleteBeforeSnapshot(snapshotId));
        Assertions.assertEquals((long)snapshotId.offset(), (long)this.log.startOffset());
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.createNewSnapshot(new OffsetAndEpoch((long)numberOfRecords, epoch - 1)));
    }

    @Test
    public void testCreateSnapshotWithMissingEpoch() {
        int firstBatchRecords = 5;
        int firstEpoch = 1;
        int missingEpoch = firstEpoch + 1;
        int secondBatchRecords = 5;
        int secondEpoch = missingEpoch + 1;
        int numberOfRecords = firstBatchRecords + secondBatchRecords;
        this.appendBatch(firstBatchRecords, firstEpoch);
        this.appendBatch(secondBatchRecords, secondEpoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)numberOfRecords));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.createNewSnapshot(new OffsetAndEpoch(1L, missingEpoch)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.createNewSnapshot(new OffsetAndEpoch((long)firstBatchRecords, missingEpoch)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.log.createNewSnapshot(new OffsetAndEpoch((long)secondBatchRecords, missingEpoch)));
    }

    @Test
    public void testCreateExistingSnapshot() {
        int numberOfRecords = 10;
        int epoch = 1;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch((long)numberOfRecords, epoch);
        this.appendBatch(numberOfRecords, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)numberOfRecords));
        try (RawSnapshotWriter snapshot = this.log.createNewSnapshot(snapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertTrue((boolean)this.log.deleteBeforeSnapshot(snapshotId));
        Assertions.assertEquals((long)snapshotId.offset(), (long)this.log.startOffset());
        Assertions.assertEquals(Optional.empty(), this.log.createNewSnapshot(snapshotId));
    }

    @Test
    public void testReadMissingSnapshot() {
        Assertions.assertFalse((boolean)this.log.readSnapshot(new OffsetAndEpoch(10L, 0)).isPresent());
    }

    @Test
    public void testUpdateLogStartOffset() throws IOException {
        int offset = 10;
        int epoch = 0;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch((long)offset, epoch);
        this.appendBatch(offset, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)offset));
        try (RawSnapshotWriter snapshot = this.log.createNewSnapshot(snapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertTrue((boolean)this.log.deleteBeforeSnapshot(snapshotId));
        Assertions.assertEquals((long)offset, (long)this.log.startOffset());
        Assertions.assertEquals((int)epoch, (int)this.log.lastFetchedEpoch());
        Assertions.assertEquals((long)offset, (long)this.log.endOffset().offset);
        int newRecords = 10;
        this.appendBatch(newRecords, epoch + 1);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)(offset + newRecords)));
        Assertions.assertFalse((boolean)this.log.deleteBeforeSnapshot(new OffsetAndEpoch((long)(offset + newRecords), epoch)));
        Assertions.assertEquals((long)offset, (long)this.log.startOffset());
        Assertions.assertEquals((int)(epoch + 1), (int)this.log.lastFetchedEpoch());
        Assertions.assertEquals((long)(offset + newRecords), (long)this.log.endOffset().offset);
        Assertions.assertEquals((long)(offset + newRecords), (long)this.log.highWatermark().offset);
    }

    @Test
    public void testUpdateLogStartOffsetWithMissingSnapshot() {
        int offset = 10;
        int epoch = 0;
        this.appendBatch(offset, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)offset));
        Assertions.assertFalse((boolean)this.log.deleteBeforeSnapshot(new OffsetAndEpoch(1L, epoch)));
        Assertions.assertEquals((long)0L, (long)this.log.startOffset());
        Assertions.assertEquals((int)epoch, (int)this.log.lastFetchedEpoch());
        Assertions.assertEquals((long)offset, (long)this.log.endOffset().offset);
        Assertions.assertEquals((long)offset, (long)this.log.highWatermark().offset);
    }

    @Test
    public void testFailToIncreaseLogStartPastHighWatermark() throws IOException {
        int offset = 10;
        int epoch = 0;
        OffsetAndEpoch snapshotId = new OffsetAndEpoch((long)(2 * offset), epoch);
        this.appendBatch(3 * offset, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata((long)offset));
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(snapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertThrows(OffsetOutOfRangeException.class, () -> this.log.deleteBeforeSnapshot(snapshotId));
    }

    @Test
    public void testTruncateFullyToLatestSnapshot() throws IOException {
        int numberOfRecords = 10;
        int epoch = 0;
        OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch((long)(2 * numberOfRecords), epoch);
        this.appendBatch(numberOfRecords, epoch);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(sameEpochSnapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertTrue((boolean)this.log.truncateToLatestSnapshot());
        Assertions.assertEquals((long)sameEpochSnapshotId.offset(), (long)this.log.startOffset());
        Assertions.assertEquals((int)sameEpochSnapshotId.epoch(), (int)this.log.lastFetchedEpoch());
        Assertions.assertEquals((long)sameEpochSnapshotId.offset(), (long)this.log.endOffset().offset);
        Assertions.assertEquals((long)sameEpochSnapshotId.offset(), (long)this.log.highWatermark().offset);
        OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch((long)(3 * numberOfRecords), epoch + 1);
        this.appendBatch(numberOfRecords, epoch);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(greaterEpochSnapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertTrue((boolean)this.log.truncateToLatestSnapshot());
        Assertions.assertEquals((long)greaterEpochSnapshotId.offset(), (long)this.log.startOffset());
        Assertions.assertEquals((int)greaterEpochSnapshotId.epoch(), (int)this.log.lastFetchedEpoch());
        Assertions.assertEquals((long)greaterEpochSnapshotId.offset(), (long)this.log.endOffset().offset);
        Assertions.assertEquals((long)greaterEpochSnapshotId.offset(), (long)this.log.highWatermark().offset);
    }

    @Test
    public void testDoesntTruncateFully() throws IOException {
        int numberOfRecords = 10;
        int epoch = 1;
        this.appendBatch(numberOfRecords, epoch);
        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch((long)numberOfRecords, epoch - 1);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(olderEpochSnapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertFalse((boolean)this.log.truncateToLatestSnapshot());
        this.appendBatch(numberOfRecords, epoch);
        OffsetAndEpoch olderOffsetSnapshotId = new OffsetAndEpoch((long)numberOfRecords, epoch);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(olderOffsetSnapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertFalse((boolean)this.log.truncateToLatestSnapshot());
    }

    @Test
    public void testTruncateWillRemoveOlderSnapshot() throws IOException {
        int numberOfRecords = 10;
        int epoch = 1;
        OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch((long)numberOfRecords, epoch);
        this.appendBatch(numberOfRecords, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata(sameEpochSnapshotId.offset()));
        try (RawSnapshotWriter snapshot = this.log.createNewSnapshot(sameEpochSnapshotId).get();){
            snapshot.freeze();
        }
        OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch((long)(2 * numberOfRecords), epoch + 1);
        this.appendBatch(numberOfRecords, epoch);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(greaterEpochSnapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertTrue((boolean)this.log.truncateToLatestSnapshot());
        Assertions.assertEquals(Optional.empty(), this.log.readSnapshot(sameEpochSnapshotId));
    }

    @Test
    public void testUpdateLogStartOffsetWillRemoveOlderSnapshot() throws IOException {
        int numberOfRecords = 10;
        int epoch = 1;
        OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch((long)numberOfRecords, epoch);
        this.appendBatch(numberOfRecords, epoch);
        this.log.updateHighWatermark(new LogOffsetMetadata(sameEpochSnapshotId.offset()));
        try (RawSnapshotWriter snapshot = this.log.createNewSnapshot(sameEpochSnapshotId).get();){
            snapshot.freeze();
        }
        OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch((long)(2 * numberOfRecords), epoch + 1);
        this.appendBatch(numberOfRecords, greaterEpochSnapshotId.epoch());
        this.log.updateHighWatermark(new LogOffsetMetadata(greaterEpochSnapshotId.offset()));
        try (RawSnapshotWriter snapshot = this.log.createNewSnapshot(greaterEpochSnapshotId).get();){
            snapshot.freeze();
        }
        Assertions.assertTrue((boolean)this.log.deleteBeforeSnapshot(greaterEpochSnapshotId));
        Assertions.assertEquals(Optional.empty(), this.log.readSnapshot(sameEpochSnapshotId));
    }

    @Test
    public void testValidateEpochGreaterThanLastKnownEpoch() {
        int numberOfRecords = 1;
        int epoch = 1;
        this.appendBatch(numberOfRecords, epoch);
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(numberOfRecords, epoch + 1);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.diverging((OffsetAndEpoch)new OffsetAndEpoch(this.log.endOffset().offset, epoch)), (Object)resultOffsetAndEpoch);
    }

    @Test
    public void testValidateEpochLessThanOldestSnapshotEpoch() throws IOException {
        int offset = 1;
        int epoch = 1;
        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch((long)offset, epoch);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(olderEpochSnapshotId).get();){
            snapshot.freeze();
        }
        this.log.truncateToLatestSnapshot();
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(offset, epoch - 1);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.snapshot((OffsetAndEpoch)olderEpochSnapshotId), (Object)resultOffsetAndEpoch);
    }

    @Test
    public void testValidateOffsetLessThanOldestSnapshotOffset() throws IOException {
        int offset = 2;
        int epoch = 1;
        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch((long)offset, epoch);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(olderEpochSnapshotId).get();){
            snapshot.freeze();
        }
        this.log.truncateToLatestSnapshot();
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(offset - 1, epoch);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.snapshot((OffsetAndEpoch)olderEpochSnapshotId), (Object)resultOffsetAndEpoch);
    }

    @Test
    public void testValidateOffsetEqualToOldestSnapshotOffset() throws IOException {
        int offset = 2;
        int epoch = 1;
        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch((long)offset, epoch);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(olderEpochSnapshotId).get();){
            snapshot.freeze();
        }
        this.log.truncateToLatestSnapshot();
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(offset, epoch);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.Kind.VALID, (Object)resultOffsetAndEpoch.kind());
    }

    @Test
    public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot() throws IOException {
        int numberOfRecords = 5;
        int offset = 10;
        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch((long)offset, 1);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(olderEpochSnapshotId).get();){
            snapshot.freeze();
        }
        this.log.truncateToLatestSnapshot();
        this.appendBatch(numberOfRecords, 1);
        this.appendBatch(numberOfRecords, 2);
        this.appendBatch(numberOfRecords, 4);
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(100L, 3);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.diverging((OffsetAndEpoch)new OffsetAndEpoch(20L, 2)), (Object)resultOffsetAndEpoch);
    }

    @Test
    public void testValidateEpochLessThanFirstEpochInLog() throws IOException {
        int numberOfRecords = 5;
        int offset = 10;
        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch((long)offset, 1);
        try (RawSnapshotWriter snapshot = this.log.storeSnapshot(olderEpochSnapshotId).get();){
            snapshot.freeze();
        }
        this.log.truncateToLatestSnapshot();
        this.appendBatch(numberOfRecords, 3);
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(100L, 2);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.diverging((OffsetAndEpoch)olderEpochSnapshotId), (Object)resultOffsetAndEpoch);
    }

    @Test
    public void testValidateOffsetGreatThanEndOffset() {
        int numberOfRecords = 1;
        int epoch = 1;
        this.appendBatch(numberOfRecords, epoch);
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(numberOfRecords + 1, epoch);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.diverging((OffsetAndEpoch)new OffsetAndEpoch(this.log.endOffset().offset, epoch)), (Object)resultOffsetAndEpoch);
    }

    @Test
    public void testValidateOffsetLessThanLEO() {
        int numberOfRecords = 10;
        int epoch = 1;
        this.appendBatch(numberOfRecords, epoch);
        this.appendBatch(numberOfRecords, epoch + 1);
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(11L, epoch);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.diverging((OffsetAndEpoch)new OffsetAndEpoch(10L, epoch)), (Object)resultOffsetAndEpoch);
    }

    @Test
    public void testValidateValidEpochAndOffset() {
        int numberOfRecords = 5;
        int epoch = 1;
        this.appendBatch(numberOfRecords, epoch);
        ValidOffsetAndEpoch resultOffsetAndEpoch = this.log.validateOffsetAndEpoch(numberOfRecords - 1, epoch);
        Assertions.assertEquals((Object)ValidOffsetAndEpoch.Kind.VALID, (Object)resultOffsetAndEpoch.kind());
    }

    private Optional<OffsetRange> readOffsets(long startOffset, Isolation isolation) {
        long firstReadOffset = -1L;
        long lastReadOffset = -1L;
        long currentStart = startOffset;
        boolean foundRecord = true;
        while (foundRecord) {
            foundRecord = false;
            Records records = this.log.read((long)currentStart, (Isolation)isolation).records;
            for (Record record : records.records()) {
                foundRecord = true;
                if (firstReadOffset < 0L) {
                    firstReadOffset = record.offset();
                }
                if (record.offset() <= lastReadOffset) continue;
                lastReadOffset = record.offset();
            }
            currentStart = lastReadOffset + 1L;
        }
        if (firstReadOffset < 0L) {
            return Optional.empty();
        }
        return Optional.of(new OffsetRange(firstReadOffset, lastReadOffset));
    }

    private void appendAsLeader(Collection<SimpleRecord> records, int epoch) {
        this.log.appendAsLeader((Records)MemoryRecords.withRecords((long)this.log.endOffset().offset, (CompressionType)CompressionType.NONE, (SimpleRecord[])records.toArray(new SimpleRecord[records.size()])), epoch);
    }

    private void appendBatch(int numRecords, int epoch) {
        ArrayList<SimpleRecord> records = new ArrayList<SimpleRecord>(numRecords);
        for (int i = 0; i < numRecords; ++i) {
            records.add(new SimpleRecord(String.valueOf(i).getBytes()));
        }
        this.appendAsLeader(records, epoch);
    }

    private static void validateReadRecords(List<SimpleRecord> expectedRecords, MockLog log) {
        Assertions.assertEquals((long)0L, (long)log.startOffset());
        Assertions.assertEquals((long)expectedRecords.size(), (long)log.endOffset().offset);
        int currentOffset = 0;
        while ((long)currentOffset < log.endOffset().offset) {
            Records records = log.read((long)((long)currentOffset), (Isolation)Isolation.UNCOMMITTED).records;
            List batches = Utils.toList(records.batches().iterator());
            Assertions.assertTrue((batches.size() > 0 ? 1 : 0) != 0);
            for (RecordBatch batch : batches) {
                Assertions.assertTrue((batch.countOrNull() > 0 ? 1 : 0) != 0);
                Assertions.assertEquals((long)currentOffset, (long)batch.baseOffset());
                Assertions.assertEquals((long)(currentOffset + batch.countOrNull() - 1), (long)batch.lastOffset());
                for (Record record : batch) {
                    Assertions.assertEquals((long)currentOffset, (long)record.offset());
                    Assertions.assertEquals((Object)expectedRecords.get(currentOffset), (Object)new SimpleRecord(record));
                    ++currentOffset;
                }
                Assertions.assertEquals((long)(currentOffset - 1), (long)batch.lastOffset());
            }
        }
    }

    private static class OffsetRange {
        public final long startOffset;
        public final long endOffset;

        private OffsetRange(long startOffset, long endOffset) {
            this.startOffset = startOffset;
            this.endOffset = endOffset;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            OffsetRange that = (OffsetRange)o;
            return this.startOffset == that.startOffset && this.endOffset == that.endOffset;
        }

        public int hashCode() {
            return Objects.hash(this.startOffset, this.endOffset);
        }

        public String toString() {
            return String.format("OffsetRange(startOffset=%s, endOffset=%s)", this.startOffset, this.endOffset);
        }
    }
}

