/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.ListSourceContext;
import org.apache.flink.streaming.api.functions.source.legacy.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;

class FromElementsFunctionTest {
    private static final String[] STRING_ARRAY_DATA = new String[]{"Oh", "boy", "what", "a", "show", "!"};
    private static final List<String> STRING_LIST_DATA = Arrays.asList(STRING_ARRAY_DATA);

    FromElementsFunctionTest() {
    }

    private static <T> List<T> runSource(FromElementsFunction<T> source) throws Exception {
        ArrayList result = new ArrayList();
        FromElementsFunction clonedSource = (FromElementsFunction)InstantiationUtil.clone(source);
        clonedSource.run(new ListSourceContext(result));
        return result;
    }

    @Test
    void testStrings() throws Exception {
        Object[] data = new String[]{"Oh", "boy", "what", "a", "show", "!"};
        FromElementsFunction source = new FromElementsFunction(BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), data);
        ArrayList result = new ArrayList();
        source.run(new ListSourceContext(result));
        Assertions.assertThat(result).containsExactly(data);
    }

    @Test
    void testNullElement() {
        Assertions.assertThatThrownBy(() -> new FromElementsFunction((Object[])new String[]{"a", null, "b"})).hasMessageContaining("contains a null element").isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testSetOutputTypeWithNoSerializer() throws Exception {
        FromElementsFunction source = new FromElementsFunction((Object[])STRING_ARRAY_DATA);
        Assertions.assertThat((Object)source.getSerializer()).isNull();
        source.setOutputType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        ((ObjectAssert)Assertions.assertThat((Object)source.getSerializer()).isNotNull()).isEqualTo((Object)BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        List result = FromElementsFunctionTest.runSource(source);
        Assertions.assertThat(result).containsExactly((Object[])STRING_ARRAY_DATA);
    }

    @Test
    void testSetOutputTypeWithSameSerializer() throws Exception {
        FromElementsFunction source = new FromElementsFunction(BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), STRING_LIST_DATA);
        TypeSerializer existingSerializer = source.getSerializer();
        source.setOutputType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        TypeSerializer newSerializer = source.getSerializer();
        Assertions.assertThat((Object)newSerializer).isEqualTo((Object)existingSerializer);
        List result = FromElementsFunctionTest.runSource(source);
        Assertions.assertThat(result).containsExactly((Object[])STRING_ARRAY_DATA);
    }

    @Test
    void testSetOutputTypeWithIncompatibleType() {
        FromElementsFunction source = new FromElementsFunction(STRING_LIST_DATA);
        Assertions.assertThatThrownBy(() -> source.setOutputType((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig())).hasMessageContaining("not all subclasses of java.lang.Integer").isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testSetOutputTypeWithExistingBrokenSerializer() throws Exception {
        ValueTypeInfo info = new ValueTypeInfo(DeserializeTooMuchType.class);
        FromElementsFunction source = new FromElementsFunction(info.createSerializer((SerializerConfig)new SerializerConfigImpl()), (Object[])new DeserializeTooMuchType[]{new DeserializeTooMuchType()});
        TypeSerializer existingSerializer = source.getSerializer();
        source.setOutputType((TypeInformation)new GenericTypeInfo(DeserializeTooMuchType.class), new ExecutionConfig());
        TypeSerializer newSerializer = source.getSerializer();
        Assertions.assertThat((Object)newSerializer).isNotEqualTo((Object)existingSerializer);
        List result = FromElementsFunctionTest.runSource(source);
        ((ObjectAssert)((ListAssert)Assertions.assertThat(result).hasSize(1)).first()).isInstanceOf(DeserializeTooMuchType.class);
    }

    @Test
    void testSetOutputTypeAfterTransferred() throws Exception {
        FromElementsFunction source = (FromElementsFunction)InstantiationUtil.clone((Serializable)new FromElementsFunction(STRING_LIST_DATA));
        Assertions.assertThatThrownBy(() -> source.setOutputType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig())).hasMessageContaining("The output type should've been specified before shipping the graph to the cluster").isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testNoSerializer() {
        FromElementsFunction source = new FromElementsFunction(STRING_LIST_DATA);
        Assertions.assertThatThrownBy(() -> FromElementsFunctionTest.runSource(source)).hasMessageContaining("serializer not configured").isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testNonJavaSerializableType() throws Exception {
        Object[] data = new MyPojo[]{new MyPojo(1L, 2), new MyPojo(3L, 4), new MyPojo(5L, 6)};
        FromElementsFunction source = new FromElementsFunction(TypeExtractor.getForClass(MyPojo.class).createSerializer((SerializerConfig)new SerializerConfigImpl()), data);
        List result = FromElementsFunctionTest.runSource(source);
        Assertions.assertThat(result).containsExactly(data);
    }

    @Test
    void testNonJavaSerializableTypeWithSetOutputType() throws Exception {
        Object[] data = new MyPojo[]{new MyPojo(1L, 2), new MyPojo(3L, 4), new MyPojo(5L, 6)};
        FromElementsFunction source = new FromElementsFunction(data);
        source.setOutputType(TypeExtractor.getForClass(MyPojo.class), new ExecutionConfig());
        List result = FromElementsFunctionTest.runSource(source);
        Assertions.assertThat(result).containsExactly(data);
    }

    @Test
    void testSerializationError() {
        ValueTypeInfo info = new ValueTypeInfo(SerializationErrorType.class);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> FromElementsFunctionTest.lambda$testSerializationError$4((TypeInformation)info)).isInstanceOf(IOException.class)).hasMessageContaining("test exception");
    }

    @Test
    void testDeSerializationError() throws Exception {
        ValueTypeInfo info = new ValueTypeInfo(DeserializeTooMuchType.class);
        FromElementsFunction source = new FromElementsFunction(info.createSerializer((SerializerConfig)new SerializerConfigImpl()), (Object[])new DeserializeTooMuchType[]{new DeserializeTooMuchType()});
        Assertions.assertThatThrownBy(() -> source.run(new ListSourceContext(new ArrayList()))).hasMessageContaining("user-defined serialization").isInstanceOf(IOException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCheckpointAndRestore() throws Exception {
        int numElements = 10000;
        ArrayList<Integer> data = new ArrayList<Integer>(10000);
        ArrayList result = new ArrayList(10000);
        for (int i = 0; i < 10000; ++i) {
            data.add(i);
        }
        final FromElementsFunction source = new FromElementsFunction((TypeSerializer)IntSerializer.INSTANCE, data);
        StreamSource src = new StreamSource((SourceFunction)source);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness(src, 1, 1, 0);
        testHarness.open();
        final ListSourceContext ctx = new ListSourceContext(result, 2L);
        CheckedThread runner = new CheckedThread(){

            public void go() throws Exception {
                source.run(ctx);
            }
        };
        runner.start();
        Thread.sleep(1000L);
        ArrayList checkpointData = new ArrayList(10000);
        OperatorSubtaskState handles = null;
        Object object = ctx.getCheckpointLock();
        synchronized (object) {
            handles = testHarness.snapshot(566L, System.currentTimeMillis());
            checkpointData.addAll(result);
        }
        source.cancel();
        runner.sync();
        FromElementsFunction sourceCopy = new FromElementsFunction((TypeSerializer)IntSerializer.INSTANCE, data);
        StreamSource srcCopy = new StreamSource((SourceFunction)sourceCopy);
        AbstractStreamOperatorTestHarness testHarnessCopy = new AbstractStreamOperatorTestHarness(srcCopy, 1, 1, 0);
        testHarnessCopy.setup();
        testHarnessCopy.initializeState(handles);
        testHarnessCopy.open();
        ListSourceContext newCtx = new ListSourceContext(checkpointData);
        sourceCopy.run(newCtx);
        Assertions.assertThat(checkpointData).isEqualTo(data);
    }

    private static /* synthetic */ void lambda$testSerializationError$4(TypeInformation info) throws Throwable {
        new FromElementsFunction(info.createSerializer((SerializerConfig)new SerializerConfigImpl()), (Object[])new SerializationErrorType[]{new SerializationErrorType()});
    }

    private static class DeserializeTooMuchType
    implements Value {
        private static final long serialVersionUID = -6037206294939421807L;

        private DeserializeTooMuchType() {
        }

        public void write(DataOutputView out) throws IOException {
            out.writeInt(42);
        }

        public void read(DataInputView in) throws IOException {
            in.readLong();
        }
    }

    private static class MyPojo {
        public long val1;
        public int val2;

        public MyPojo() {
        }

        public MyPojo(long val1, int val2) {
            this.val1 = val1;
            this.val2 = val2;
        }

        public int hashCode() {
            return this.val2;
        }

        public boolean equals(Object obj) {
            if (obj instanceof MyPojo) {
                MyPojo that = (MyPojo)obj;
                return this.val1 == that.val1 && this.val2 == that.val2;
            }
            return false;
        }
    }

    private static class SerializationErrorType
    implements Value {
        private static final long serialVersionUID = -6037206294939421807L;

        private SerializationErrorType() {
        }

        public void write(DataOutputView out) throws IOException {
            throw new IOException("test exception");
        }

        public void read(DataInputView in) throws IOException {
            throw new IOException("test exception");
        }
    }
}

