/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReduceTaskExternalITCase
extends DriverTestBase<GroupReduceFunction<Record, Record>> {
    private static final Logger LOG = LoggerFactory.getLogger(ReduceTaskExternalITCase.class);
    private final RecordComparator comparator = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final List<Record> outList = new ArrayList<Record>();

    ReduceTaskExternalITCase(ExecutionConfig config) {
        super(config, 0L, 1, 0x300000L);
    }

    @TestTemplate
    void testSingleLevelMergeReduceTask() {
        int keyCnt = 8192;
        int valCnt = 8;
        this.setNumFileHandlesForSort(2);
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try {
            this.addInputSorted(new UniformRecordGenerator(8192, 8, false), this.comparator.duplicate());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockReduceStub.class);
        }
        catch (Exception e) {
            LOG.info("Exception while running the test task.", (Throwable)e);
            Assertions.fail((String)("Exception in Test: " + e.getMessage()));
        }
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), 8192})).hasSize(8192);
        for (Record record : this.outList) {
            ((AbstractIntegerAssert)Assertions.assertThat((int)((IntValue)record.getField(1, IntValue.class)).getValue()).withFailMessage("Incorrect result", new Object[0])).isEqualTo(8 - ((IntValue)record.getField(0, IntValue.class)).getValue());
        }
        this.outList.clear();
    }

    @TestTemplate
    void testMultiLevelMergeReduceTask() {
        int keyCnt = 32768;
        int valCnt = 8;
        this.setNumFileHandlesForSort(2);
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try {
            this.addInputSorted(new UniformRecordGenerator(32768, 8, false), this.comparator.duplicate());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockReduceStub.class);
        }
        catch (Exception e) {
            LOG.info("Exception while running the test task.", (Throwable)e);
            Assertions.fail((String)("Exception in Test: " + e.getMessage()));
        }
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), 32768})).hasSize(32768);
        for (Record record : this.outList) {
            ((AbstractIntegerAssert)Assertions.assertThat((int)((IntValue)record.getField(1, IntValue.class)).getValue()).withFailMessage("Incorrect result", new Object[0])).isEqualTo(8 - ((IntValue)record.getField(0, IntValue.class)).getValue());
        }
        this.outList.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testSingleLevelMergeCombiningReduceTask() throws IOException {
        int keyCnt = 8192;
        int valCnt = 8;
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try (ExternalSorter sorter = null;){
            sorter = ExternalSorter.newBuilder((MemoryManager)this.getMemoryManager(), (AbstractInvokable)this.getContainingTask(), RecordSerializerFactory.get().getSerializer(), (TypeComparator)this.comparator.duplicate()).maxNumFileHandles(2).withCombiner((GroupCombineFunction)new MockCombiningReduceStub()).enableSpilling(this.getIOManager(), (double)0.8f).memoryFraction(this.perSortFractionMem).objectReuse(true).largeRecords(true).build((MutableObjectIterator)new UniformRecordGenerator(8192, 8, false));
            this.addInput((MutableObjectIterator<Record>)sorter.getIterator());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockCombiningReduceStub.class);
        }
        int expSum = 0;
        for (int i = 1; i < 8; ++i) {
            expSum += i;
        }
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), 8192})).hasSize(8192);
        for (Record record : this.outList) {
            ((AbstractIntegerAssert)Assertions.assertThat((int)((IntValue)record.getField(1, IntValue.class)).getValue()).withFailMessage("Incorrect result", new Object[0])).isEqualTo(expSum - ((IntValue)record.getField(0, IntValue.class)).getValue());
        }
        this.outList.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMultiLevelMergeCombiningReduceTask() throws IOException {
        int keyCnt = 32768;
        int valCnt = 8;
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try (ExternalSorter sorter = null;){
            sorter = ExternalSorter.newBuilder((MemoryManager)this.getMemoryManager(), (AbstractInvokable)this.getContainingTask(), RecordSerializerFactory.get().getSerializer(), (TypeComparator)this.comparator.duplicate()).maxNumFileHandles(2).withCombiner((GroupCombineFunction)new MockCombiningReduceStub()).enableSpilling(this.getIOManager(), (double)0.8f).memoryFraction(this.perSortFractionMem).objectReuse(false).largeRecords(true).build((MutableObjectIterator)new UniformRecordGenerator(keyCnt, valCnt, false));
            this.addInput((MutableObjectIterator<Record>)sorter.getIterator());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockCombiningReduceStub.class);
        }
        int expSum = 0;
        for (int i = 1; i < valCnt; ++i) {
            expSum += i;
        }
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), keyCnt})).hasSize(keyCnt);
        for (Record record : this.outList) {
            ((AbstractIntegerAssert)Assertions.assertThat((int)((IntValue)record.getField(1, IntValue.class)).getValue()).withFailMessage("Incorrect result", new Object[0])).isEqualTo(expSum - ((IntValue)record.getField(0, IntValue.class)).getValue());
        }
        this.outList.clear();
    }

    public static class MockReduceStub
    extends RichGroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int cnt = 0;
            Iterator<Record> iterator = records.iterator();
            while (iterator.hasNext()) {
                Record next;
                element = next = iterator.next();
                ++cnt;
            }
            element.getField(0, (Value)this.key);
            this.value.setValue(cnt - this.key.getValue());
            element.setField(1, (Value)this.value);
            out.collect((Object)element);
        }
    }

    public static class MockCombiningReduceStub
    implements GroupReduceFunction<Record, Record>,
    GroupCombineFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private final IntValue combineValue = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> iterator = records.iterator();
            while (iterator.hasNext()) {
                Record next;
                element = next = iterator.next();
                element.getField(1, (Value)this.value);
                sum += this.value.getValue();
            }
            element.getField(0, (Value)this.key);
            this.value.setValue(sum - this.key.getValue());
            element.setField(1, (Value)this.value);
            out.collect((Object)element);
        }

        public void combine(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> iterator = records.iterator();
            while (iterator.hasNext()) {
                Record next;
                element = next = iterator.next();
                element.getField(1, (Value)this.combineValue);
                sum += this.combineValue.getValue();
            }
            this.combineValue.setValue(sum);
            element.setField(1, (Value)this.combineValue);
            out.collect((Object)element);
        }
    }
}

