/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk;

import java.io.EOFException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegmentSource;
import org.apache.flink.runtime.io.disk.SpillingBuffer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SpillingBufferTest {
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = Integer.MAX_VALUE;
    private static final int VALUE_LENGTH = 114;
    private static final int NUM_PAIRS_INMEM = 6000;
    private static final int NUM_PAIRS_EXTERNAL = 30000;
    private static final int MEMORY_SIZE = 0x100000;
    private static final int NUM_MEMORY_SEGMENTS = 23;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;

    SpillingBufferTest() {
    }

    @BeforeEach
    void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x100000L).build();
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memoryManager.verifyEmpty()).withFailMessage("Memory leak: not all segments have been returned to the memory manager.", new Object[0])).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    void testWriteReadInMemory() throws Exception {
        String v2;
        int k2;
        String v1;
        int k1;
        int i;
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        ArrayList memory = new ArrayList(23);
        this.memoryManager.allocatePages((Object)this.parentTask, memory, 23);
        SpillingBuffer outView = new SpillingBuffer(this.ioManager, (MemorySegmentSource)new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
        Tuple2 rec = new Tuple2();
        for (int i2 = 0; i2 < 6000; ++i2) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        DataInputView inView = outView.flip();
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (i = 0; i < 6000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, inView);
            k1 = (Integer)rec.f0;
            v1 = (String)rec.f1;
            k2 = (Integer)readRec.f0;
            v2 = (String)readRec.f1;
            ((AbstractBooleanAssert)Assertions.assertThat((k1 == k2 && v1.equals(v2) ? 1 : 0) != 0).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0])).isTrue();
        }
        inView = outView.flip();
        generator.reset();
        for (i = 0; i < 6000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, inView);
            k1 = (Integer)rec.f0;
            v1 = (String)rec.f1;
            k2 = (Integer)readRec.f0;
            v2 = (String)readRec.f1;
            ((AbstractBooleanAssert)Assertions.assertThat((k1 == k2 && v1.equals(v2) ? 1 : 0) != 0).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0])).isTrue();
        }
        this.memoryManager.release((Collection)outView.close());
        this.memoryManager.release(memory);
    }

    @Test
    void testWriteReadTooMuchInMemory() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        ArrayList memory = new ArrayList(23);
        this.memoryManager.allocatePages((Object)this.parentTask, memory, 23);
        SpillingBuffer outView = new SpillingBuffer(this.ioManager, (MemorySegmentSource)new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 6000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        DataInputView inView = outView.flip();
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 6000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, inView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            ((AbstractBooleanAssert)Assertions.assertThat((k1 == k2 && v1.equals(v2) ? 1 : 0) != 0).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0])).isTrue();
        }
        generator.next((Tuple2<Integer, String>)rec);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> serializer.deserialize((Object)readRec, inView)).withFailMessage("Read too much, expected EOFException.", new Object[0])).isInstanceOf(EOFException.class);
        DataInputView nextInView = outView.flip();
        generator.reset();
        for (int i = 0; i < 6000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, nextInView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            ((AbstractBooleanAssert)Assertions.assertThat((k1 == k2 && v1.equals(v2) ? 1 : 0) != 0).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0])).isTrue();
        }
        this.memoryManager.release((Collection)outView.close());
        this.memoryManager.release(memory);
    }

    @Test
    void testWriteReadExternal() throws Exception {
        String v2;
        int k2;
        String v1;
        int k1;
        int i;
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        ArrayList memory = new ArrayList(23);
        this.memoryManager.allocatePages((Object)this.parentTask, memory, 23);
        SpillingBuffer outView = new SpillingBuffer(this.ioManager, (MemorySegmentSource)new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
        Tuple2 rec = new Tuple2();
        for (int i2 = 0; i2 < 30000; ++i2) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        DataInputView inView = outView.flip();
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (i = 0; i < 30000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, inView);
            k1 = (Integer)rec.f0;
            v1 = (String)rec.f1;
            k2 = (Integer)readRec.f0;
            v2 = (String)readRec.f1;
            ((AbstractBooleanAssert)Assertions.assertThat((k1 == k2 && v1.equals(v2) ? 1 : 0) != 0).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0])).isTrue();
        }
        inView = outView.flip();
        generator.reset();
        for (i = 0; i < 30000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, inView);
            k1 = (Integer)rec.f0;
            v1 = (String)rec.f1;
            k2 = (Integer)readRec.f0;
            v2 = (String)readRec.f1;
            ((AbstractBooleanAssert)Assertions.assertThat((k1 == k2 && v1.equals(v2) ? 1 : 0) != 0).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0])).isTrue();
        }
        this.memoryManager.release((Collection)outView.close());
        this.memoryManager.release(memory);
    }

    @Test
    void testWriteReadTooMuchExternal() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        ArrayList memory = new ArrayList(23);
        this.memoryManager.allocatePages((Object)this.parentTask, memory, 23);
        SpillingBuffer outView = new SpillingBuffer(this.ioManager, (MemorySegmentSource)new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 30000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        DataInputView inView = outView.flip();
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 30000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, inView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            ((AbstractBooleanAssert)Assertions.assertThat((k1 == k2 && v1.equals(v2) ? 1 : 0) != 0).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0])).isTrue();
        }
        generator.next((Tuple2<Integer, String>)rec);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> serializer.deserialize((Object)readRec, inView)).withFailMessage("Read too much, expected EOFException.", new Object[0])).isInstanceOf(EOFException.class);
        DataInputView nextInView = outView.flip();
        generator.reset();
        for (int i = 0; i < 30000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, nextInView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            ((AbstractBooleanAssert)Assertions.assertThat((k1 == k2 && v1.equals(v2) ? 1 : 0) != 0).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0])).isTrue();
        }
        this.memoryManager.release((Collection)outView.close());
        this.memoryManager.release(memory);
    }
}

