/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;

class GenericWriteAheadSinkTest
extends WriteAheadSinkTestBase<Tuple1<Integer>, ListSink> {
    GenericWriteAheadSinkTest() {
    }

    @Override
    protected ListSink createSink() throws Exception {
        return new ListSink();
    }

    @Override
    protected TupleTypeInfo<Tuple1<Integer>> createTypeInfo() {
        return TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{Integer.class});
    }

    @Override
    protected Tuple1<Integer> generateValue(int counter, int checkpointID) {
        return new Tuple1((Object)counter);
    }

    @Override
    protected void verifyResultsIdealCircumstances(ListSink sink) {
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int x = 1; x <= 60; ++x) {
            list.add(x);
        }
        for (Integer i : sink.values) {
            list.remove(i);
        }
        ((ListAssert)Assertions.assertThat(list).as("The following ID's where not found in the result list: " + list, new Object[0])).isEmpty();
        ((ListAssert)Assertions.assertThat(sink.values).as("The sink emitted to many values: " + (sink.values.size() - 60), new Object[0])).hasSize(60);
    }

    @Override
    protected void verifyResultsDataPersistenceUponMissedNotify(ListSink sink) {
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int x = 1; x <= 60; ++x) {
            list.add(x);
        }
        for (Integer i : sink.values) {
            list.remove(i);
        }
        ((ListAssert)Assertions.assertThat(list).as("The following ID's where not found in the result list: " + list, new Object[0])).isEmpty();
        ((ListAssert)Assertions.assertThat(sink.values).as("The sink emitted to many values: " + (sink.values.size() - 60), new Object[0])).hasSize(60);
    }

    @Override
    protected void verifyResultsDataDiscardingUponRestore(ListSink sink) {
        int x;
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (x = 1; x <= 20; ++x) {
            list.add(x);
        }
        for (x = 41; x <= 60; ++x) {
            list.add(x);
        }
        for (Integer i : sink.values) {
            list.remove(i);
        }
        ((ListAssert)Assertions.assertThat(list).as("The following ID's where not found in the result list: " + list, new Object[0])).isEmpty();
        ((ListAssert)Assertions.assertThat(sink.values).as("The sink emitted to many values: " + (sink.values.size() - 40), new Object[0])).hasSize(40);
    }

    @Override
    protected void verifyResultsWhenReScaling(ListSink sink, int startElementCounter, int endElementCounter) throws Exception {
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int i = startElementCounter; i <= endElementCounter; ++i) {
            list.add(i);
        }
        Assertions.assertThat(sink.values).containsExactlyInAnyOrderElementsOf(list);
    }

    @Test
    void testCommitterException() throws Exception {
        int x;
        ListSink2 sink = new ListSink2();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)sink);
        testHarness.open();
        int elementCounter = 1;
        for (x = 0; x < 10; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness.snapshot(0L, 0L);
        testHarness.notifyOfCompletedCheckpoint(0L);
        Assertions.assertThat(sink.values).isEmpty();
        for (x = 0; x < 11; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 1)));
            ++elementCounter;
        }
        testHarness.snapshot(1L, 0L);
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(sink.values).hasSize(10);
        for (x = 0; x < 12; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 2)));
            ++elementCounter;
        }
        testHarness.snapshot(2L, 0L);
        testHarness.notifyOfCompletedCheckpoint(2L);
        Assertions.assertThat(sink.values).hasSize(43);
    }

    public static class ListSink
    extends GenericWriteAheadSink<Tuple1<Integer>> {
        private static final long serialVersionUID = 1L;
        public List<Integer> values = new ArrayList<Integer>();

        public ListSink() throws Exception {
            super((CheckpointCommitter)new SimpleCommitter(), TypeExtractor.getForObject((Object)new Tuple1((Object)1)).createSerializer((SerializerConfig)new SerializerConfigImpl()), "job");
        }

        protected boolean sendValues(Iterable<Tuple1<Integer>> values, long checkpointId, long timestamp) throws Exception {
            for (Tuple1<Integer> value : values) {
                this.values.add((Integer)value.f0);
            }
            return true;
        }
    }

    public static class ListSink2
    extends GenericWriteAheadSink<Tuple1<Integer>> {
        private static final long serialVersionUID = 1L;
        public List<Integer> values = new ArrayList<Integer>();

        public ListSink2() throws Exception {
            super((CheckpointCommitter)new FailingCommitter(), TypeExtractor.getForObject((Object)new Tuple1((Object)1)).createSerializer((SerializerConfig)new SerializerConfigImpl()), "job");
        }

        protected boolean sendValues(Iterable<Tuple1<Integer>> values, long checkpointId, long timestamp) throws Exception {
            for (Tuple1<Integer> value : values) {
                this.values.add((Integer)value.f0);
            }
            return true;
        }
    }

    private static class FailingCommitter
    extends CheckpointCommitter {
        private static final long serialVersionUID = 1L;
        private List<Tuple2<Long, Integer>> checkpoints;
        private boolean failIsCommitted = true;
        private boolean failCommit = true;

        private FailingCommitter() {
        }

        public void open() throws Exception {
        }

        public void close() throws Exception {
        }

        public void createResource() throws Exception {
            this.checkpoints = new ArrayList<Tuple2<Long, Integer>>();
        }

        public void commitCheckpoint(int subtaskIdx, long checkpointID) {
            if (this.failCommit) {
                this.failCommit = false;
                throw new RuntimeException("Expected exception");
            }
            this.checkpoints.add((Tuple2<Long, Integer>)new Tuple2((Object)checkpointID, (Object)subtaskIdx));
        }

        public boolean isCheckpointCommitted(int subtaskIdx, long checkpointID) {
            if (this.failIsCommitted) {
                this.failIsCommitted = false;
                throw new RuntimeException("Expected exception");
            }
            return false;
        }
    }

    private static class SimpleCommitter
    extends CheckpointCommitter {
        private static final long serialVersionUID = 1L;
        private List<Tuple2<Long, Integer>> checkpoints;

        private SimpleCommitter() {
        }

        public void open() throws Exception {
        }

        public void close() throws Exception {
        }

        public void createResource() throws Exception {
            this.checkpoints = new ArrayList<Tuple2<Long, Integer>>();
        }

        public void commitCheckpoint(int subtaskIdx, long checkpointID) {
            this.checkpoints.add((Tuple2<Long, Integer>)new Tuple2((Object)checkpointID, (Object)subtaskIdx));
        }

        public boolean isCheckpointCommitted(int subtaskIdx, long checkpointID) {
            return this.checkpoints.contains(new Tuple2((Object)checkpointID, (Object)subtaskIdx));
        }
    }
}

