/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateByteBuffer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateChunkReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.RecoveredChannelStateHandler;
import org.apache.flink.runtime.checkpoint.channel.TestException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ChannelStateChunkReaderTest {
    ChannelStateChunkReaderTest() {
    }

    @Test
    void testBufferRecycledOnFailure() {
        FailingChannelStateSerializer serializer = new FailingChannelStateSerializer();
        TestRecoveredChannelStateHandler handler = new TestRecoveredChannelStateHandler();
        Assertions.assertThatThrownBy(() -> {
            try (FSDataInputStream stream = ChannelStateChunkReaderTest.getStream((ChannelStateSerializer)serializer, 10);){
                new ChannelStateChunkReader((ChannelStateSerializer)serializer).readChunk(stream, serializer.getHeaderLength(), (RecoveredChannelStateHandler)handler, (Object)"channelInfo", 0);
            }
            catch (Throwable throwable) {
                Preconditions.checkState((boolean)serializer.failed);
                Preconditions.checkState((!handler.requestedBuffers.isEmpty() ? 1 : 0) != 0);
                throw throwable;
            }
            Preconditions.checkState((boolean)serializer.failed);
            Preconditions.checkState((!handler.requestedBuffers.isEmpty() ? 1 : 0) != 0);
        }).isInstanceOf(TestException.class);
        Assertions.assertThat(handler.requestedBuffers).allMatch(TestChannelStateByteBuffer::isRecycled);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testBufferRecycledOnSuccess() throws IOException, InterruptedException {
        ChannelStateSerializerImpl serializer = new ChannelStateSerializerImpl();
        TestRecoveredChannelStateHandler handler = new TestRecoveredChannelStateHandler();
        try {
            try (FSDataInputStream stream = ChannelStateChunkReaderTest.getStream((ChannelStateSerializer)serializer, 10);){
                new ChannelStateChunkReader((ChannelStateSerializer)serializer).readChunk(stream, serializer.getHeaderLength(), (RecoveredChannelStateHandler)handler, (Object)"channelInfo", 0);
            }
            Preconditions.checkState((!handler.requestedBuffers.isEmpty() ? 1 : 0) != 0);
        }
        catch (Throwable throwable) {
            Preconditions.checkState((!handler.requestedBuffers.isEmpty() ? 1 : 0) != 0);
            Assertions.assertThat(handler.requestedBuffers).allMatch(TestChannelStateByteBuffer::isRecycled);
            throw throwable;
        }
        Assertions.assertThat(handler.requestedBuffers).allMatch(TestChannelStateByteBuffer::isRecycled);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testBuffersNotRequestedForEmptyStream() throws IOException, InterruptedException {
        ChannelStateSerializerImpl serializer = new ChannelStateSerializerImpl();
        TestRecoveredChannelStateHandler handler = new TestRecoveredChannelStateHandler();
        try (FSDataInputStream stream = ChannelStateChunkReaderTest.getStream((ChannelStateSerializer)serializer, 0);){
            new ChannelStateChunkReader((ChannelStateSerializer)serializer).readChunk(stream, serializer.getHeaderLength(), (RecoveredChannelStateHandler)handler, (Object)"channelInfo", 0);
        }
        finally {
            Assertions.assertThat(handler.requestedBuffers).isEmpty();
        }
    }

    @Test
    void testNoSeekUnnecessarily() throws IOException, InterruptedException {
        int offset = 123;
        FSDataInputStream stream = new FSDataInputStream(){

            public long getPos() {
                return 123L;
            }

            public void seek(long ignored) {
                Assertions.fail((String)"It shouldn't be called.");
            }

            public int read() {
                return 0;
            }
        };
        new ChannelStateChunkReader((ChannelStateSerializer)new ChannelStateSerializerImpl()).readChunk(stream, 123L, (RecoveredChannelStateHandler)new TestRecoveredChannelStateHandler(), (Object)"channelInfo", 0);
    }

    private static FSDataInputStream getStream(ChannelStateSerializer serializer, int size) throws IOException {
        try (ByteArrayOutputStream out = new ByteArrayOutputStream();){
            DataOutputStream dataStream = new DataOutputStream(out);
            serializer.writeHeader(dataStream);
            serializer.writeData(dataStream, new Buffer[]{new NetworkBuffer(MemorySegmentFactory.wrap((byte[])new byte[size]), FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, size)});
            dataStream.flush();
            FSDataInputStream fSDataInputStream = new ByteStreamStateHandle("", out.toByteArray()).openInputStream();
            return fSDataInputStream;
        }
    }

    private static class FailingChannelStateSerializer
    extends ChannelStateSerializerImpl {
        private boolean failed;

        private FailingChannelStateSerializer() {
        }

        public int readData(InputStream stream, ChannelStateByteBuffer buffer, int bytes) {
            this.failed = true;
            throw new TestException();
        }
    }

    private static class TestRecoveredChannelStateHandler
    implements RecoveredChannelStateHandler<Object, Object> {
        private final List<TestChannelStateByteBuffer> requestedBuffers = new ArrayList<TestChannelStateByteBuffer>();

        private TestRecoveredChannelStateHandler() {
        }

        public RecoveredChannelStateHandler.BufferWithContext<Object> getBuffer(Object o) {
            TestChannelStateByteBuffer buffer = new TestChannelStateByteBuffer();
            this.requestedBuffers.add(buffer);
            return new RecoveredChannelStateHandler.BufferWithContext((ChannelStateByteBuffer)buffer, null);
        }

        public void recover(Object o, int oldSubtaskIndex, RecoveredChannelStateHandler.BufferWithContext<Object> bufferWithContext) {
            bufferWithContext.close();
        }

        public void close() throws Exception {
        }
    }

    private static class TestChannelStateByteBuffer
    implements ChannelStateByteBuffer {
        private boolean recycled;

        private TestChannelStateByteBuffer() {
        }

        public boolean isWritable() {
            return true;
        }

        public void close() {
            Preconditions.checkArgument((!this.recycled ? 1 : 0) != 0);
            this.recycled = true;
        }

        public boolean isRecycled() {
            return this.recycled;
        }

        public int writeBytes(InputStream input, int bytesToRead) throws IOException {
            Preconditions.checkArgument((!this.recycled ? 1 : 0) != 0);
            input.skip(bytesToRead);
            return bytesToRead;
        }
    }
}

