/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

class BatchAccumulatorTest {
    private final MemoryPool memoryPool = (MemoryPool)Mockito.mock(MemoryPool.class);
    private final MockTime time = new MockTime();
    private final StringSerde serde = new StringSerde();
    static final Appender APPEND_ATOMIC = new Appender(){

        @Override
        public Long call(BatchAccumulator<String> acc, int epoch, List<String> records) {
            return acc.append(epoch, records, OptionalLong.empty(), true);
        }
    };
    static final Appender APPEND = new Appender(){

        @Override
        public Long call(BatchAccumulator<String> acc, int epoch, List<String> records) {
            return acc.append(epoch, records, OptionalLong.empty(), false);
        }
    };

    BatchAccumulatorTest() {
    }

    private BatchAccumulator<String> buildAccumulator(int leaderEpoch, long baseOffset, int lingerMs, int maxBatchSize) {
        return new BatchAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize, this.memoryPool, (Time)this.time, CompressionType.NONE, (RecordSerde)this.serde);
    }

    @Test
    public void testLeaderChangeMessageWritten() {
        int leaderEpoch = 17;
        long baseOffset = 0L;
        int lingerMs = 50;
        int maxBatchSize = 512;
        ByteBuffer buffer = ByteBuffer.allocate(256);
        Mockito.when((Object)this.memoryPool.tryAllocate(256)).thenReturn((Object)buffer);
        BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
        acc.appendLeaderChangeMessage(new LeaderChangeMessage(), this.time.milliseconds());
        Assertions.assertTrue((boolean)acc.needsDrain(this.time.milliseconds()));
        List batches = acc.drain();
        Assertions.assertEquals((int)1, (int)batches.size());
        BatchAccumulator.CompletedBatch batch = (BatchAccumulator.CompletedBatch)batches.get(0);
        batch.release();
        ((MemoryPool)Mockito.verify((Object)this.memoryPool)).release(buffer);
    }

    @Test
    public void testForceDrain() {
        Arrays.asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
            int leaderEpoch = 17;
            long baseOffset = 157L;
            int lingerMs = 50;
            int maxBatchSize = 512;
            Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)ByteBuffer.allocate(maxBatchSize));
            BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
            List<String> records = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
            Assertions.assertEquals((long)baseOffset, (Long)appender.call(acc, leaderEpoch, records.subList(0, 1)));
            Assertions.assertEquals((long)(baseOffset + 2L), (Long)appender.call(acc, leaderEpoch, records.subList(1, 3)));
            Assertions.assertEquals((long)(baseOffset + 5L), (Long)appender.call(acc, leaderEpoch, records.subList(3, 6)));
            Assertions.assertEquals((long)(baseOffset + 7L), (Long)appender.call(acc, leaderEpoch, records.subList(6, 8)));
            Assertions.assertEquals((long)(baseOffset + 8L), (Long)appender.call(acc, leaderEpoch, records.subList(8, 9)));
            Assertions.assertFalse((boolean)acc.needsDrain(this.time.milliseconds()));
            acc.forceDrain();
            Assertions.assertTrue((boolean)acc.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals((long)0L, (long)acc.timeUntilDrain(this.time.milliseconds()));
            List batches = acc.drain();
            Assertions.assertEquals((int)1, (int)batches.size());
            Assertions.assertFalse((boolean)acc.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals((long)(Long.MAX_VALUE - this.time.milliseconds()), (long)acc.timeUntilDrain(this.time.milliseconds()));
            BatchAccumulator.CompletedBatch batch = (BatchAccumulator.CompletedBatch)batches.get(0);
            Assertions.assertEquals(records, batch.records.get());
            Assertions.assertEquals((long)baseOffset, (long)batch.baseOffset);
            Assertions.assertEquals((long)this.time.milliseconds(), (long)batch.appendTimestamp());
        });
    }

    @Test
    public void testForceDrainBeforeAppendLeaderChangeMessage() {
        Arrays.asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
            int leaderEpoch = 17;
            long baseOffset = 157L;
            int lingerMs = 50;
            int maxBatchSize = 512;
            Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)ByteBuffer.allocate(maxBatchSize));
            Mockito.when((Object)this.memoryPool.tryAllocate(256)).thenReturn((Object)ByteBuffer.allocate(256));
            BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
            List<String> records = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
            Assertions.assertEquals((long)baseOffset, (Long)appender.call(acc, leaderEpoch, records.subList(0, 1)));
            Assertions.assertEquals((long)(baseOffset + 2L), (Long)appender.call(acc, leaderEpoch, records.subList(1, 3)));
            Assertions.assertEquals((long)(baseOffset + 5L), (Long)appender.call(acc, leaderEpoch, records.subList(3, 6)));
            Assertions.assertEquals((long)(baseOffset + 7L), (Long)appender.call(acc, leaderEpoch, records.subList(6, 8)));
            Assertions.assertEquals((long)(baseOffset + 8L), (Long)appender.call(acc, leaderEpoch, records.subList(8, 9)));
            Assertions.assertFalse((boolean)acc.needsDrain(this.time.milliseconds()));
            acc.appendLeaderChangeMessage(new LeaderChangeMessage(), this.time.milliseconds());
            Assertions.assertTrue((boolean)acc.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals((long)0L, (long)acc.timeUntilDrain(this.time.milliseconds()));
            List batches = acc.drain();
            Assertions.assertEquals((int)2, (int)batches.size());
            Assertions.assertFalse((boolean)acc.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals((long)(Long.MAX_VALUE - this.time.milliseconds()), (long)acc.timeUntilDrain(this.time.milliseconds()));
            BatchAccumulator.CompletedBatch batch = (BatchAccumulator.CompletedBatch)batches.get(0);
            Assertions.assertEquals(records, batch.records.get());
            Assertions.assertEquals((long)baseOffset, (long)batch.baseOffset);
            Assertions.assertEquals((long)this.time.milliseconds(), (long)batch.appendTimestamp());
        });
    }

    @Test
    public void testLingerIgnoredIfAccumulatorEmpty() {
        int leaderEpoch = 17;
        long baseOffset = 157L;
        int lingerMs = 50;
        int maxBatchSize = 512;
        BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
        Assertions.assertTrue((boolean)acc.isEmpty());
        Assertions.assertFalse((boolean)acc.needsDrain(this.time.milliseconds()));
        Assertions.assertEquals((long)(Long.MAX_VALUE - this.time.milliseconds()), (long)acc.timeUntilDrain(this.time.milliseconds()));
    }

    @Test
    public void testLingerBeginsOnFirstWrite() {
        int leaderEpoch = 17;
        long baseOffset = 157L;
        int lingerMs = 50;
        int maxBatchSize = 512;
        Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)ByteBuffer.allocate(maxBatchSize));
        BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
        this.time.sleep(15L);
        Assertions.assertEquals((long)baseOffset, (long)acc.append(leaderEpoch, Collections.singletonList("a"), OptionalLong.empty(), false));
        Assertions.assertEquals((long)lingerMs, (long)acc.timeUntilDrain(this.time.milliseconds()));
        Assertions.assertFalse((boolean)acc.isEmpty());
        this.time.sleep((long)(lingerMs / 2));
        Assertions.assertEquals((long)(lingerMs / 2), (long)acc.timeUntilDrain(this.time.milliseconds()));
        Assertions.assertFalse((boolean)acc.isEmpty());
        this.time.sleep((long)(lingerMs / 2));
        Assertions.assertEquals((long)0L, (long)acc.timeUntilDrain(this.time.milliseconds()));
        Assertions.assertTrue((boolean)acc.needsDrain(this.time.milliseconds()));
        Assertions.assertFalse((boolean)acc.isEmpty());
    }

    @Test
    public void testCompletedBatchReleaseBuffer() {
        int leaderEpoch = 17;
        long baseOffset = 157L;
        int lingerMs = 50;
        int maxBatchSize = 512;
        ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
        Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)buffer);
        BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
        Assertions.assertEquals((long)baseOffset, (long)acc.append(leaderEpoch, Collections.singletonList("a"), OptionalLong.empty(), false));
        this.time.sleep((long)lingerMs);
        List batches = acc.drain();
        Assertions.assertEquals((int)1, (int)batches.size());
        BatchAccumulator.CompletedBatch batch = (BatchAccumulator.CompletedBatch)batches.get(0);
        batch.release();
        ((MemoryPool)Mockito.verify((Object)this.memoryPool)).release(buffer);
    }

    @Test
    public void testUnflushedBuffersReleasedByClose() {
        int leaderEpoch = 17;
        long baseOffset = 157L;
        int lingerMs = 50;
        int maxBatchSize = 512;
        ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
        Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)buffer);
        BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
        Assertions.assertEquals((long)baseOffset, (long)acc.append(leaderEpoch, Collections.singletonList("a"), OptionalLong.empty(), false));
        acc.close();
        ((MemoryPool)Mockito.verify((Object)this.memoryPool)).release(buffer);
    }

    @Test
    public void testSingleBatchAccumulation() {
        Arrays.asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
            int leaderEpoch = 17;
            long baseOffset = 157L;
            int lingerMs = 50;
            int maxBatchSize = 512;
            Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)ByteBuffer.allocate(maxBatchSize));
            BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
            List<String> records = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
            Assertions.assertEquals((long)baseOffset, (Long)appender.call(acc, leaderEpoch, records.subList(0, 1)));
            Assertions.assertEquals((long)(baseOffset + 2L), (Long)appender.call(acc, leaderEpoch, records.subList(1, 3)));
            Assertions.assertEquals((long)(baseOffset + 5L), (Long)appender.call(acc, leaderEpoch, records.subList(3, 6)));
            Assertions.assertEquals((long)(baseOffset + 7L), (Long)appender.call(acc, leaderEpoch, records.subList(6, 8)));
            Assertions.assertEquals((long)(baseOffset + 8L), (Long)appender.call(acc, leaderEpoch, records.subList(8, 9)));
            long expectedAppendTimestamp = this.time.milliseconds();
            this.time.sleep((long)lingerMs);
            Assertions.assertTrue((boolean)acc.needsDrain(this.time.milliseconds()));
            List batches = acc.drain();
            Assertions.assertEquals((int)1, (int)batches.size());
            Assertions.assertFalse((boolean)acc.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals((long)(Long.MAX_VALUE - this.time.milliseconds()), (long)acc.timeUntilDrain(this.time.milliseconds()));
            BatchAccumulator.CompletedBatch batch = (BatchAccumulator.CompletedBatch)batches.get(0);
            Assertions.assertEquals(records, batch.records.get());
            Assertions.assertEquals((long)baseOffset, (long)batch.baseOffset);
            Assertions.assertEquals((long)expectedAppendTimestamp, (long)batch.appendTimestamp());
        });
    }

    @Test
    public void testMultipleBatchAccumulation() {
        Arrays.asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
            int leaderEpoch = 17;
            long baseOffset = 157L;
            int lingerMs = 50;
            int maxBatchSize = 256;
            Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)ByteBuffer.allocate(maxBatchSize));
            BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
            while (acc.numCompletedBatches() < 3) {
                appender.call(acc, leaderEpoch, Collections.singletonList("foo"));
            }
            List batches = acc.drain();
            Assertions.assertEquals((int)4, (int)batches.size());
            Assertions.assertTrue((boolean)batches.stream().allMatch(batch -> batch.data.sizeInBytes() <= maxBatchSize));
        });
    }

    @Test
    public void testRecordsAreSplit() {
        int leaderEpoch = 17;
        long baseOffset = 157L;
        int lingerMs = 50;
        String record = "a";
        int numberOfRecords = 9;
        int recordsPerBatch = 2;
        int batchHeaderSize = AbstractRecords.recordBatchHeaderSizeInBytes((byte)2, (CompressionType)CompressionType.NONE);
        int maxBatchSize = batchHeaderSize + recordsPerBatch * this.recordSizeInBytes(record, recordsPerBatch);
        Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)ByteBuffer.allocate(maxBatchSize));
        BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
        List records = Stream.generate(() -> record).limit(numberOfRecords).collect(Collectors.toList());
        Assertions.assertEquals((long)(baseOffset + (long)numberOfRecords - 1L), (long)acc.append(leaderEpoch, records, OptionalLong.empty(), false));
        this.time.sleep((long)lingerMs);
        Assertions.assertTrue((boolean)acc.needsDrain(this.time.milliseconds()));
        List batches = acc.drain();
        int expectedBatches = (records.size() + recordsPerBatch - 1) / recordsPerBatch;
        Assertions.assertEquals((int)expectedBatches, (int)batches.size());
        Assertions.assertTrue((boolean)batches.stream().allMatch(batch -> batch.data.sizeInBytes() <= maxBatchSize));
    }

    @Test
    public void testCloseWhenEmpty() {
        int leaderEpoch = 17;
        long baseOffset = 157L;
        int lingerMs = 50;
        int maxBatchSize = 256;
        BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
        acc.close();
        Mockito.verifyNoInteractions((Object[])new Object[]{this.memoryPool});
    }

    @Test
    public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception {
        int leaderEpoch = 17;
        long baseOffset = 157L;
        int lingerMs = 50;
        int maxBatchSize = 256;
        StringSerde serde = (StringSerde)Mockito.spy((Object)new StringSerde());
        BatchAccumulator acc = new BatchAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize, this.memoryPool, (Time)this.time, CompressionType.NONE, (RecordSerde)serde);
        CountDownLatch acquireLockLatch = new CountDownLatch(1);
        CountDownLatch releaseLockLatch = new CountDownLatch(1);
        Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)ByteBuffer.allocate(maxBatchSize));
        acc.append(leaderEpoch, Collections.singletonList("a"), OptionalLong.empty(), false);
        ((StringSerde)Mockito.doAnswer(invocation -> {
            Writable writable = (Writable)invocation.getArgument(2);
            acquireLockLatch.countDown();
            releaseLockLatch.await();
            writable.writeByteArray(Utils.utf8((String)"b"));
            return null;
        }).when((Object)serde)).write((String)Mockito.eq((Object)"b"), (ObjectSerializationCache)Mockito.any(ObjectSerializationCache.class), (Writable)Mockito.any(Writable.class));
        Thread appendThread = new Thread(() -> acc.append(leaderEpoch, Collections.singletonList("b"), OptionalLong.empty(), false));
        appendThread.start();
        acquireLockLatch.await();
        this.time.sleep((long)lingerMs);
        Assertions.assertTrue((boolean)acc.needsDrain(this.time.milliseconds()));
        Assertions.assertEquals(Collections.emptyList(), (Object)acc.drain());
        Assertions.assertTrue((boolean)acc.needsDrain(this.time.milliseconds()));
        releaseLockLatch.countDown();
        appendThread.join();
        List drained = acc.drain();
        Assertions.assertEquals((int)1, (int)drained.size());
        Assertions.assertEquals((long)(Long.MAX_VALUE - this.time.milliseconds()), (long)acc.timeUntilDrain(this.time.milliseconds()));
        drained.stream().forEach(completedBatch -> completedBatch.data.batches().forEach(recordBatch -> Assertions.assertEquals((int)leaderEpoch, (int)recordBatch.partitionLeaderEpoch())));
    }

    int recordSizeInBytes(String record, int numberOfRecords) {
        int serdeSize = this.serde.recordSize("a", new ObjectSerializationCache());
        int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes((int)numberOfRecords, (long)0L, (int)-1, (int)serdeSize, (Header[])DefaultRecord.EMPTY_HEADERS);
        return ByteUtils.sizeOfVarint((int)recordSizeInBytes) + recordSizeInBytes;
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testAppendWithRequiredBaseOffset(boolean correctOffset) {
        int leaderEpoch = 17;
        long baseOffset = 157L;
        int lingerMs = 50;
        int maxBatchSize = 512;
        ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
        Mockito.when((Object)this.memoryPool.tryAllocate(maxBatchSize)).thenReturn((Object)buffer);
        BatchAccumulator<String> acc = this.buildAccumulator(leaderEpoch, baseOffset, lingerMs, maxBatchSize);
        if (correctOffset) {
            Assertions.assertEquals((long)baseOffset, (long)acc.append(leaderEpoch, Collections.singletonList("a"), OptionalLong.of(baseOffset), true));
        } else {
            Assertions.assertEquals((Object)"Wanted base offset 156, but the next offset was 157", (Object)((UnexpectedBaseOffsetException)Assertions.assertThrows(UnexpectedBaseOffsetException.class, () -> acc.append(leaderEpoch, Collections.singletonList("a"), OptionalLong.of(baseOffset - 1L), true))).getMessage());
        }
        acc.close();
    }

    static interface Appender {
        public Long call(BatchAccumulator<String> var1, int var2, List<String> var3);
    }
}

