/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.utils;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.legacy.io.CollectionInputFormat;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.legacy.connector.source.TableFunctionProvider;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

public class TestCollectionTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    public static boolean isStreaming = true;
    private static final LinkedList<Row> SOURCE_DATA = new LinkedList();
    private static final LinkedList<Row> DIM_DATA = new LinkedList();
    private static final LinkedList<Row> RESULT = new LinkedList();
    private static long emitIntervalMS = -1L;

    public static void initData(List<Row> sourceData, List<Row> dimData, Long emitInterval) {
        SOURCE_DATA.addAll(sourceData);
        DIM_DATA.addAll(dimData);
        emitIntervalMS = emitInterval == null ? -1L : emitInterval;
    }

    public static void reset() {
        RESULT.clear();
        SOURCE_DATA.clear();
        DIM_DATA.clear();
        emitIntervalMS = -1L;
    }

    public static CollectionTableSource getCollectionSource(ResolvedCatalogTable catalogTable, boolean isStreaming) {
        String parallelismProp = catalogTable.getOptions().getOrDefault("parallelism", null);
        Optional<Object> parallelism = parallelismProp == null ? Optional.empty() : Optional.of(Integer.parseInt(parallelismProp));
        return new CollectionTableSource(emitIntervalMS, catalogTable.getResolvedSchema().toSourceRowDataType(), isStreaming, parallelism);
    }

    public static CollectionTableSink getCollectionSink(ResolvedCatalogTable catalogTable) {
        return new CollectionTableSink(catalogTable.getResolvedSchema().toSinkRowDataType());
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        return TestCollectionTableFactory.getCollectionSource(context.getCatalogTable(), isStreaming);
    }

    public String factoryIdentifier() {
        return "COLLECTION";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return new HashSet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet();
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        return TestCollectionTableFactory.getCollectionSink(context.getCatalogTable());
    }

    static class CollectionTableSource
    implements ScanTableSource,
    LookupTableSource {
        private final Long emitIntervalMS;
        private final DataType rowType;
        private final boolean isStreaming;
        private final Optional<Integer> parallelism;

        public CollectionTableSource(Long emitIntervalMS, DataType rowType, boolean isStreaming, Optional<Integer> parallelism) {
            this.emitIntervalMS = emitIntervalMS;
            this.rowType = rowType;
            this.isStreaming = isStreaming;
            this.parallelism = parallelism;
        }

        public DynamicTableSource copy() {
            return new CollectionTableSource(this.emitIntervalMS, this.rowType, this.isStreaming, this.parallelism);
        }

        public String asSummaryString() {
            return "CollectionTableSource";
        }

        public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
            int[] lookupIndices = Arrays.stream(context.getKeys()).mapToInt(k -> k[0]).toArray();
            return TableFunctionProvider.of((TableFunction)new TemporalTableFetcher(DIM_DATA, lookupIndices));
        }

        public ChangelogMode getChangelogMode() {
            return ChangelogMode.insertOnly();
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            final TypeInformation type = runtimeProviderContext.createTypeInformation(this.rowType);
            final TypeSerializer serializer = type.createSerializer((SerializerConfig)new SerializerConfigImpl());
            DynamicTableSource.DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(this.rowType);
            final List rowData = SOURCE_DATA.stream().map(row -> (RowData)converter.toInternal(row)).collect(Collectors.toList());
            return new DataStreamScanProvider(){

                public DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
                    DataStreamSource dataStream = execEnv.createInput(new TestCollectionInputFormat(emitIntervalMS, rowData, serializer), type);
                    parallelism.ifPresent(arg_0 -> ((DataStreamSource)dataStream).setParallelism(arg_0));
                    return dataStream;
                }

                public boolean isBounded() {
                    return !isStreaming;
                }
            };
        }
    }

    static class CollectionTableSink
    implements DynamicTableSink {
        private final DataType outputType;

        public CollectionTableSink(DataType outputType) {
            this.outputType = outputType;
        }

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return requestedMode;
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            final TypeInformation typeInformation = context.createTypeInformation(this.outputType);
            final DynamicTableSink.DataStructureConverter converter = context.createDataStructureConverter(this.outputType);
            return new DataStreamSinkProvider(){

                public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                    return dataStream.addSink((SinkFunction)new UnsafeMemorySinkFunction((TypeInformation<Row>)typeInformation, converter)).setParallelism(1);
                }
            };
        }

        public DynamicTableSink copy() {
            return new CollectionTableSink(this.outputType);
        }

        public String asSummaryString() {
            return String.format("CollectionTableSink(%s)", this.outputType);
        }
    }

    static class UnsafeMemorySinkFunction
    extends RichSinkFunction<RowData> {
        private TypeSerializer<Row> serializer;
        private final TypeInformation<Row> outputType;
        private final DynamicTableSink.DataStructureConverter converter;

        public UnsafeMemorySinkFunction(TypeInformation<Row> outputType, DynamicTableSink.DataStructureConverter converter) {
            this.outputType = outputType;
            this.converter = converter;
        }

        public void open(OpenContext openContext) throws Exception {
            this.serializer = this.outputType.createSerializer((SerializerConfig)new SerializerConfigImpl());
        }

        public void invoke(RowData value, SinkFunction.Context context) throws Exception {
            RESULT.add((Row)this.serializer.copy((Object)((Row)this.converter.toExternal((Object)value))));
        }
    }

    static class TemporalTableFetcher
    extends TableFunction<Row> {
        private final LinkedList<Row> dimData;
        private final int[] keyes;

        public TemporalTableFetcher(LinkedList<Row> dimData, int[] keyes) {
            this.dimData = dimData;
            this.keyes = keyes;
        }

        public void eval(Object ... values) {
            for (Row data : this.dimData) {
                boolean matched = true;
                for (int idx = 0; matched && idx < this.keyes.length; ++idx) {
                    Object dimField = data.getField(this.keyes[idx]);
                    Object inputField = values[idx];
                    matched = dimField != null ? dimField.equals(inputField) : inputField == null;
                }
                if (!matched) continue;
                Row ret = new Row(data.getArity());
                for (int i = 0; i < data.getArity(); ++i) {
                    ret.setField(i, data.getField(i));
                }
                this.collect(ret);
            }
        }
    }

    static class TestCollectionInputFormat<T>
    extends CollectionInputFormat<T> {
        private final long emitIntervalMs;

        public TestCollectionInputFormat(long emitIntervalMs, Collection<T> dataSet, TypeSerializer<T> serializer) {
            super(dataSet, serializer);
            this.emitIntervalMs = emitIntervalMs;
        }

        public boolean reachedEnd() throws IOException {
            if (this.emitIntervalMs > 0L) {
                try {
                    Thread.sleep(this.emitIntervalMs);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            return super.reachedEnd();
        }
    }
}

