/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft.internals;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.raft.internals.RecordsIterator;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

public final class RecordsIteratorTest {
    private static final RecordSerde<String> STRING_SERDE = new StringSerde();

    private static Stream<Arguments> emptyRecords() throws IOException {
        return Stream.of(FileRecords.open((File)TestUtils.tempFile()), MemoryRecords.EMPTY).map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    @ParameterizedTest
    @MethodSource(value={"emptyRecords"})
    void testEmptyRecords(Records records) {
        this.testIterator(Collections.emptyList(), records, true);
    }

    @Property(tries=50)
    public void testMemoryRecords(@ForAll CompressionType compressionType, @ForAll long seed) {
        List<TestBatch<String>> batches = RecordsIteratorTest.createBatches(seed);
        MemoryRecords memRecords = RecordsIteratorTest.buildRecords(compressionType, batches);
        this.testIterator(batches, (Records)memRecords, true);
    }

    @Property(tries=50)
    public void testFileRecords(@ForAll CompressionType compressionType, @ForAll long seed) throws IOException {
        List<TestBatch<String>> batches = RecordsIteratorTest.createBatches(seed);
        MemoryRecords memRecords = RecordsIteratorTest.buildRecords(compressionType, batches);
        FileRecords fileRecords = FileRecords.open((File)TestUtils.tempFile());
        fileRecords.append(memRecords);
        this.testIterator(batches, (Records)fileRecords, true);
        fileRecords.close();
    }

    @Property(tries=50)
    public void testCrcValidation(@ForAll CompressionType compressionType, @ForAll long seed) throws IOException {
        List<TestBatch<String>> batches = RecordsIteratorTest.createBatches(seed);
        MemoryRecords memRecords = RecordsIteratorTest.buildRecords(compressionType, batches);
        ByteBuffer readBuf = memRecords.buffer();
        readBuf.position(17);
        int actualCrc = readBuf.getInt();
        memRecords.buffer().putInt(17, actualCrc + 1);
        Assertions.assertThrows(CorruptRecordException.class, () -> this.testIterator(batches, (Records)memRecords, true));
        FileRecords fileRecords = FileRecords.open((File)TestUtils.tempFile());
        fileRecords.append(memRecords);
        Assertions.assertThrows(CorruptRecordException.class, () -> this.testIterator(batches, (Records)fileRecords, true));
        Assertions.assertDoesNotThrow(() -> this.testIterator(batches, (Records)memRecords, false));
        Assertions.assertDoesNotThrow(() -> this.testIterator(batches, (Records)fileRecords, false));
        memRecords.buffer().putInt(17, actualCrc);
        Assertions.assertDoesNotThrow(() -> this.testIterator(batches, (Records)memRecords, true));
        FileRecords moreFileRecords = FileRecords.open((File)TestUtils.tempFile());
        moreFileRecords.append(memRecords);
        Assertions.assertDoesNotThrow(() -> this.testIterator(batches, (Records)moreFileRecords, true));
        fileRecords.close();
        moreFileRecords.close();
    }

    @Test
    public void testControlRecordIteration() {
        AtomicReference<Object> buffer = new AtomicReference<Object>(null);
        try (RecordsSnapshotWriter snapshot = RecordsSnapshotWriter.createWithHeader((RawSnapshotWriter)new MockRawSnapshotWriter(new OffsetAndEpoch(100L, 10), snapshotBuf -> buffer.set(snapshotBuf)), (int)4096, (MemoryPool)MemoryPool.NONE, (Time)new MockTime(), (long)0L, (CompressionType)CompressionType.NONE, STRING_SERDE);){
            snapshot.append(Arrays.asList("a", "b", "c"));
            snapshot.append(Arrays.asList("d", "e", "f"));
            snapshot.append(Arrays.asList("g", "h", "i"));
            snapshot.freeze();
        }
        try (RecordsIterator<String> iterator = RecordsIteratorTest.createIterator((Records)MemoryRecords.readableRecords((ByteBuffer)buffer.get()), BufferSupplier.NO_CACHING, true);){
            Batch batch = iterator.next();
            Assertions.assertEquals((int)1, (int)batch.controlRecords().size());
            Assertions.assertEquals((Object)ControlRecordType.SNAPSHOT_HEADER, (Object)((ControlRecord)batch.controlRecords().get(0)).type());
            Assertions.assertEquals((Object)new SnapshotHeaderRecord(), (Object)((ControlRecord)batch.controlRecords().get(0)).message());
            while ((batch = iterator.next()).controlRecords().isEmpty()) {
            }
            Assertions.assertEquals((int)1, (int)batch.controlRecords().size());
            Assertions.assertEquals((Object)ControlRecordType.SNAPSHOT_FOOTER, (Object)((ControlRecord)batch.controlRecords().get(0)).type());
            Assertions.assertEquals((Object)new SnapshotFooterRecord(), (Object)((ControlRecord)batch.controlRecords().get(0)).message());
            Assertions.assertFalse((boolean)iterator.hasNext());
        }
    }

    @ParameterizedTest
    @EnumSource(value=ControlRecordType.class, names={"LEADER_CHANGE", "SNAPSHOT_HEADER", "SNAPSHOT_FOOTER"})
    void testWithAllSupportedControlRecords(ControlRecordType type) {
        LeaderChangeMessage expectedMessage;
        MemoryRecords records = RecordsIteratorTest.buildControlRecords(type);
        switch (type) {
            case LEADER_CHANGE: {
                expectedMessage = new LeaderChangeMessage();
                break;
            }
            case SNAPSHOT_HEADER: {
                expectedMessage = new SnapshotHeaderRecord();
                break;
            }
            case SNAPSHOT_FOOTER: {
                expectedMessage = new SnapshotFooterRecord();
                break;
            }
            default: {
                throw new RuntimeException("Should not happen. Poorly configured test");
            }
        }
        try (RecordsIterator<String> iterator = RecordsIteratorTest.createIterator((Records)records, BufferSupplier.NO_CACHING, true);){
            Assertions.assertTrue((boolean)iterator.hasNext());
            Assertions.assertEquals(Collections.singletonList(new ControlRecord(type, (ApiMessage)expectedMessage)), (Object)iterator.next().controlRecords());
            Assertions.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    void testControlRecordTypeValues() {
        Assertions.assertEquals((int)6, (int)ControlRecordType.values().length);
    }

    private void testIterator(List<TestBatch<String>> expectedBatches, Records records, boolean validateCrc) {
        Set<ByteBuffer> allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap());
        try (RecordsIterator<String> iterator = RecordsIteratorTest.createIterator(records, RecordsIteratorTest.mockBufferSupplier(allocatedBuffers), validateCrc);){
            for (TestBatch<String> batch : expectedBatches) {
                Assertions.assertTrue((boolean)iterator.hasNext());
                Assertions.assertEquals(batch, TestBatch.from(iterator.next()));
            }
            Assertions.assertFalse((boolean)iterator.hasNext());
            Assertions.assertThrows(NoSuchElementException.class, () -> iterator.next());
        }
        Assertions.assertEquals(Collections.emptySet(), allocatedBuffers);
    }

    static RecordsIterator<String> createIterator(Records records, BufferSupplier bufferSupplier, boolean validateCrc) {
        return new RecordsIterator(records, STRING_SERDE, bufferSupplier, 17, validateCrc);
    }

    static BufferSupplier mockBufferSupplier(Set<ByteBuffer> buffers) {
        BufferSupplier bufferSupplier = (BufferSupplier)Mockito.mock(BufferSupplier.class);
        Mockito.when((Object)bufferSupplier.get(Mockito.anyInt())).thenAnswer(invocation -> {
            int size = (Integer)invocation.getArgument(0);
            ByteBuffer buffer = ByteBuffer.allocate(size);
            buffers.add(buffer);
            return buffer;
        });
        ((BufferSupplier)Mockito.doAnswer(invocation -> {
            ByteBuffer released = (ByteBuffer)invocation.getArgument(0);
            buffers.remove(released);
            return null;
        }).when((Object)bufferSupplier)).release((ByteBuffer)Mockito.any(ByteBuffer.class));
        return bufferSupplier;
    }

    public static List<TestBatch<String>> createBatches(long seed) {
        Random random = new Random(seed);
        long baseOffset = random.nextInt(100);
        int epoch = random.nextInt(3) + 1;
        long appendTimestamp = random.nextInt(1000);
        int numberOfBatches = random.nextInt(100) + 1;
        ArrayList<TestBatch<String>> batches = new ArrayList<TestBatch<String>>(numberOfBatches);
        for (int i = 0; i < numberOfBatches; ++i) {
            int numberOfRecords = random.nextInt(100) + 1;
            List records = random.ints(numberOfRecords, 0, 10).mapToObj(String::valueOf).collect(Collectors.toList());
            batches.add(new TestBatch(baseOffset, epoch, appendTimestamp, records));
            baseOffset += (long)records.size();
            if (i % 5 == 0) {
                epoch += random.nextInt(3);
            }
            appendTimestamp += (long)random.nextInt(1000);
        }
        return batches;
    }

    public static MemoryRecords buildControlRecords(ControlRecordType type) {
        MemoryRecords records;
        switch (type) {
            case LEADER_CHANGE: {
                records = MemoryRecords.withLeaderChangeMessage((long)0L, (long)0L, (int)1, (ByteBuffer)ByteBuffer.allocate(128), (LeaderChangeMessage)new LeaderChangeMessage());
                break;
            }
            case SNAPSHOT_HEADER: {
                records = MemoryRecords.withSnapshotHeaderRecord((long)0L, (long)0L, (int)1, (ByteBuffer)ByteBuffer.allocate(128), (SnapshotHeaderRecord)new SnapshotHeaderRecord());
                break;
            }
            case SNAPSHOT_FOOTER: {
                records = MemoryRecords.withSnapshotFooterRecord((long)0L, (long)0L, (int)1, (ByteBuffer)ByteBuffer.allocate(128), (SnapshotFooterRecord)new SnapshotFooterRecord());
                break;
            }
            default: {
                throw new RuntimeException(String.format("Control record type %s is not supported", type));
            }
        }
        return records;
    }

    public static MemoryRecords buildRecords(CompressionType compressionType, List<TestBatch<String>> batches) {
        ByteBuffer buffer = ByteBuffer.allocate(102400);
        for (TestBatch<String> batch : batches) {
            BatchBuilder builder = new BatchBuilder(buffer, STRING_SERDE, compressionType, batch.baseOffset, batch.appendTimestamp, false, batch.epoch, 1024);
            for (String record : batch.records) {
                builder.appendRecord((Object)record, null);
            }
            builder.build();
        }
        buffer.flip();
        return MemoryRecords.readableRecords((ByteBuffer)buffer);
    }

    public static final class TestBatch<T> {
        final long baseOffset;
        final int epoch;
        final long appendTimestamp;
        final List<T> records;

        TestBatch(long baseOffset, int epoch, long appendTimestamp, List<T> records) {
            this.baseOffset = baseOffset;
            this.epoch = epoch;
            this.appendTimestamp = appendTimestamp;
            this.records = records;
        }

        public String toString() {
            return String.format("TestBatch(baseOffset=%s, epoch=%s, records=%s)", this.baseOffset, this.epoch, this.records);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestBatch testBatch = (TestBatch)o;
            return this.baseOffset == testBatch.baseOffset && this.epoch == testBatch.epoch && Objects.equals(this.records, testBatch.records);
        }

        public int hashCode() {
            return Objects.hash(this.baseOffset, this.epoch, this.records);
        }

        static <T> TestBatch<T> from(Batch<T> batch) {
            return new TestBatch<T>(batch.baseOffset(), batch.epoch(), batch.appendTimestamp(), batch.records());
        }
    }
}

