/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.factories;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.RelativeClock;
import org.apache.flink.util.clock.SystemClock;
import org.assertj.core.api.Assertions;

final class TestValuesRuntimeFunctions {
    static final Object LOCK = TestValuesTableFactory.class;
    private static final Map<String, Map<Integer, List<Row>>> globalRawResult = new HashMap<String, Map<Integer, List<Row>>>();
    private static final Map<String, Map<Integer, Map<Row, Row>>> globalUpsertResult = new HashMap<String, Map<Integer, Map<Row, Row>>>();
    private static final Map<String, Map<Integer, List<Row>>> globalRetractResult = new HashMap<String, Map<Integer, List<Row>>>();
    private static final Map<String, List<org.apache.flink.api.common.eventtime.Watermark>> watermarkHistory = new HashMap<String, List<org.apache.flink.api.common.eventtime.Watermark>>();
    private static final Map<String, List<BiConsumer<Integer, List<Row>>>> localRawResultsObservers = new HashMap<String, List<BiConsumer<Integer, List<Row>>>>();

    TestValuesRuntimeFunctions() {
    }

    static List<String> getRawResultsAsStrings(String tableName) {
        return TestValuesRuntimeFunctions.getRawResults(tableName).stream().map(TestValuesRuntimeFunctions::rowToString).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<Row> getRawResults(String tableName) {
        Object object = LOCK;
        synchronized (object) {
            if (globalRawResult.containsKey(tableName)) {
                return globalRawResult.get(tableName).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
            }
        }
        return Collections.emptyList();
    }

    static List<String> getOnlyRawResultsAsStrings() {
        return TestValuesRuntimeFunctions.getOnlyRawResults().stream().map(TestValuesRuntimeFunctions::rowToString).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<Row> getOnlyRawResults() {
        Object object = LOCK;
        synchronized (object) {
            if (globalRawResult.size() != 1) {
                throw new IllegalStateException("Expected results for only one table to be present, but found " + globalRawResult.size());
            }
            return globalRawResult.values().iterator().next().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks(String tableName) {
        Object object = LOCK;
        synchronized (object) {
            if (watermarkHistory.containsKey(tableName)) {
                return new ArrayList<org.apache.flink.api.common.eventtime.Watermark>((Collection)watermarkHistory.get(tableName));
            }
            return Collections.emptyList();
        }
    }

    static List<String> getResultsAsStrings(String tableName) {
        return TestValuesRuntimeFunctions.getResults(tableName).stream().map(Row::toString).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<Row> getResults(String tableName) {
        Object object = LOCK;
        synchronized (object) {
            if (globalUpsertResult.containsKey(tableName)) {
                return globalUpsertResult.get(tableName).values().stream().flatMap(map -> map.values().stream()).collect(Collectors.toList());
            }
            if (globalRetractResult.containsKey(tableName)) {
                return globalRetractResult.get(tableName).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
            }
            if (globalRawResult.containsKey(tableName)) {
                return TestValuesRuntimeFunctions.getRawResults(tableName);
            }
        }
        return Collections.emptyList();
    }

    static void registerLocalRawResultsObserver(String tableName, BiConsumer<Integer, List<Row>> observer) {
        localRawResultsObservers.computeIfAbsent(tableName, n -> new ArrayList()).add(observer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void clearResults() {
        Object object = LOCK;
        synchronized (object) {
            globalRawResult.clear();
            globalUpsertResult.clear();
            globalRetractResult.clear();
            watermarkHistory.clear();
            localRawResultsObservers.clear();
        }
    }

    static LineageVertex createLineageVertex(final String name, final String namespace) {
        return new LineageVertex(){

            public List<LineageDataset> datasets() {
                return Arrays.asList(new DefaultLineageDataset(name, namespace, new HashMap()));
            }
        };
    }

    private static String rowToString(Row row) {
        if (RowUtils.USE_LEGACY_TO_STRING) {
            return String.format("%s(%s)", row.getKind().shortString(), row);
        }
        return row.toString();
    }

    public static class TestNoLookupUntilNthAccessAsyncLookupFunction
    extends AsyncTestValueLookupFunction {
        private static final long serialVersionUID = 1L;
        private static Collection<RowData> emptyResult = Collections.emptyList();
        private final int lookupThreshold;
        private transient Map<RowData, Integer> accessCounter;

        public TestNoLookupUntilNthAccessAsyncLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection, int lookupThreshold) {
            super(data, lookupIndices, producedRowType, converter, generatedProjection);
            this.lookupThreshold = lookupThreshold;
        }

        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
            this.accessCounter = new HashMap<RowData, Integer>();
        }

        protected int counter(RowData key) {
            int currentCnt = this.accessCounter.computeIfAbsent(key, cnt -> 0) + 1;
            this.accessCounter.put(key, currentCnt);
            return currentCnt;
        }

        @Override
        public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
            int currentCnt = this.counter(keyRow);
            if (currentCnt <= this.lookupThreshold) {
                return CompletableFuture.supplyAsync(() -> emptyResult);
            }
            return super.asyncLookup(keyRow);
        }
    }

    public static class TestNoLookupUntilNthAccessLookupFunction
    extends TestValuesLookupFunction {
        private static final long serialVersionUID = 1L;
        private final int lookupThreshold;
        private transient Map<RowData, Integer> accessCounter;

        protected TestNoLookupUntilNthAccessLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection, int lookupThreshold) {
            super(data, lookupIndices, producedRowType, converter, generatedProjection);
            this.lookupThreshold = lookupThreshold;
        }

        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
            this.accessCounter = new HashMap<RowData, Integer>();
        }

        protected int counter(RowData key) {
            int currentCnt = this.accessCounter.computeIfAbsent(key, cnt -> 0) + 1;
            this.accessCounter.put(key, currentCnt);
            return currentCnt;
        }

        @Override
        public Collection<RowData> lookup(RowData keyRow) throws IOException {
            int currentCnt = this.counter(keyRow);
            if (currentCnt <= this.lookupThreshold) {
                return null;
            }
            return super.lookup(keyRow);
        }
    }

    public static class AsyncTestValueLookupFunction
    extends AsyncLookupFunction
    implements LineageVertexProvider {
        private static final String LINEAGE_NAMESPACE = "values://TestValuesLookupFunction";
        private static final long serialVersionUID = 1L;
        private final List<Row> data;
        private final int[] lookupIndices;
        private final RowType producedRowType;
        private final DynamicTableSource.DataStructureConverter converter;
        private final GeneratedProjection generatedProjection;
        private final boolean projectable;
        private final Random random;
        private transient boolean isOpenCalled = false;
        private transient ExecutorService executor;
        private transient Map<RowData, List<RowData>> indexedData;
        private transient Projection<RowData, GenericRowData> projection;
        private transient TypeSerializer<RowData> rowSerializer;

        protected AsyncTestValueLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection) {
            this.data = data;
            this.lookupIndices = lookupIndices;
            this.producedRowType = producedRowType;
            this.converter = converter;
            this.projectable = generatedProjection.isPresent();
            this.generatedProjection = generatedProjection.orElse(null);
            this.random = new Random();
        }

        public void open(FunctionContext context) throws Exception {
            TestValuesTableFactory.RESOURCE_COUNTER.incrementAndGet();
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            if (this.projectable) {
                this.projection = (Projection)this.generatedProjection.newInstance(classLoader);
            }
            this.converter.open(RuntimeConverter.Context.create((ClassLoader)classLoader));
            this.rowSerializer = InternalSerializers.create((RowType)this.producedRowType);
            this.isOpenCalled = true;
            this.executor = Executors.newFixedThreadPool(2);
            this.indexDataByKey();
        }

        public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
            Preconditions.checkArgument((boolean)this.isOpenCalled, (Object)"open() is not called.");
            for (int i = 0; i < keyRow.getArity(); ++i) {
                Preconditions.checkNotNull((Object)((GenericRowData)keyRow).getField(i), (String)String.format("Lookup key %s contains null value, which should not happen.", keyRow));
            }
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(this.random.nextInt(5));
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return this.indexedData.get(keyRow);
            }, this.executor);
        }

        public LineageVertex getLineageVertex() {
            return TestValuesRuntimeFunctions.createLineageVertex("", LINEAGE_NAMESPACE);
        }

        public void close() throws Exception {
            TestValuesTableFactory.RESOURCE_COUNTER.decrementAndGet();
            if (this.executor != null && !this.executor.isShutdown()) {
                this.executor.shutdown();
            }
        }

        private void indexDataByKey() {
            this.indexedData = new HashMap<RowData, List<RowData>>();
            this.data.forEach(record -> {
                GenericRowData rowData = (GenericRowData)this.converter.toInternal(record);
                if (this.projectable) {
                    rowData = (GenericRowData)this.projection.apply((RowData)rowData);
                }
                Preconditions.checkNotNull((Object)rowData, (String)"Cannot convert record to internal GenericRowData type");
                GenericRowData key = GenericRowData.of((Object[])Arrays.stream(this.lookupIndices).mapToObj(arg_0 -> ((GenericRowData)rowData).getField(arg_0)).toArray());
                RowData copiedRow = (RowData)this.rowSerializer.copy((Object)rowData);
                List<RowData> list = this.indexedData.get(key);
                if (list != null) {
                    list.add(copiedRow);
                } else {
                    list = new ArrayList<RowData>();
                    list.add(copiedRow);
                    this.indexedData.put((RowData)key, list);
                }
            });
        }
    }

    public static class TestValuesLookupFunction
    extends LookupFunction
    implements LineageVertexProvider {
        private static final String LINEAGE_NAMESPACE = "values://TestValuesLookupFunction";
        private static final long serialVersionUID = 1L;
        private final List<Row> data;
        private final int[] lookupIndices;
        private final RowType producedRowType;
        private final DynamicTableSource.DataStructureConverter converter;
        private final GeneratedProjection generatedProjection;
        private final boolean projectable;
        private transient Map<RowData, List<RowData>> indexedData;
        private transient boolean isOpenCalled = false;
        private transient Projection<RowData, GenericRowData> projection;
        private transient TypeSerializer<RowData> rowSerializer;

        protected TestValuesLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection) {
            this.data = data;
            this.lookupIndices = lookupIndices;
            this.producedRowType = producedRowType;
            this.converter = converter;
            this.projectable = generatedProjection.isPresent();
            this.generatedProjection = generatedProjection.orElse(null);
        }

        public void open(FunctionContext context) throws Exception {
            TestValuesTableFactory.RESOURCE_COUNTER.incrementAndGet();
            this.isOpenCalled = true;
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            if (this.projectable) {
                this.projection = (Projection)this.generatedProjection.newInstance(classLoader);
            }
            this.converter.open(RuntimeConverter.Context.create((ClassLoader)classLoader));
            this.rowSerializer = InternalSerializers.create((RowType)this.producedRowType);
            this.indexDataByKey();
        }

        public Collection<RowData> lookup(RowData keyRow) throws IOException {
            Preconditions.checkArgument((boolean)this.isOpenCalled, (Object)"open() is not called.");
            for (int i = 0; i < keyRow.getArity(); ++i) {
                Preconditions.checkNotNull((Object)((GenericRowData)keyRow).getField(i), (String)String.format("Lookup key %s contains null value, which should not happen.", keyRow));
            }
            return this.indexedData.get(keyRow);
        }

        public LineageVertex getLineageVertex() {
            return TestValuesRuntimeFunctions.createLineageVertex("", LINEAGE_NAMESPACE);
        }

        public void close() throws Exception {
            TestValuesTableFactory.RESOURCE_COUNTER.decrementAndGet();
        }

        private void indexDataByKey() {
            this.indexedData = new HashMap<RowData, List<RowData>>();
            this.data.forEach(record -> {
                GenericRowData rowData = (GenericRowData)this.converter.toInternal(record);
                if (this.projectable) {
                    rowData = (GenericRowData)this.projection.apply((RowData)rowData);
                }
                Preconditions.checkNotNull((Object)rowData, (String)"Cannot convert record to internal GenericRowData type");
                GenericRowData key = GenericRowData.of((Object[])Arrays.stream(this.lookupIndices).mapToObj(arg_0 -> ((GenericRowData)rowData).getField(arg_0)).toArray());
                RowData copiedRow = (RowData)this.rowSerializer.copy((Object)rowData);
                List<RowData> list = this.indexedData.get(key);
                if (list != null) {
                    list.add(copiedRow);
                } else {
                    list = new ArrayList<RowData>();
                    list.add(copiedRow);
                    this.indexedData.put((RowData)key, list);
                }
            });
        }
    }

    static class AppendingOutputFormat
    extends RichOutputFormat<RowData>
    implements LineageVertexProvider {
        private static final String LINEAGE_NAMESPACE = "values://AppendingOutputFormat";
        private static final long serialVersionUID = 1L;
        private final String tableName;
        private final DynamicTableSink.DataStructureConverter converter;
        protected transient List<Row> localRawResult;

        protected AppendingOutputFormat(String tableName, DynamicTableSink.DataStructureConverter converter) {
            this.tableName = tableName;
            this.converter = converter;
        }

        public void configure(Configuration parameters) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(OutputFormat.InitializationContext context) throws IOException {
            this.localRawResult = new ArrayList<Row>();
            Object object = LOCK;
            synchronized (object) {
                globalRawResult.computeIfAbsent(this.tableName, k -> new HashMap()).put(context.getTaskNumber(), this.localRawResult);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void writeRecord(RowData value) throws IOException {
            if (value.getRowKind() == RowKind.INSERT) {
                Row row = (Row)this.converter.toExternal((Object)value);
                Assertions.assertThat((Object)row).isNotNull();
                Object object = LOCK;
                synchronized (object) {
                    this.localRawResult.add(row);
                    Optional.ofNullable(localRawResultsObservers.get(this.tableName)).orElse(Collections.emptyList()).forEach(c -> c.accept(this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.localRawResult));
                }
            } else {
                throw new RuntimeException("AppendingOutputFormat received " + value.getRowKind() + " messages.");
            }
        }

        public LineageVertex getLineageVertex() {
            return TestValuesRuntimeFunctions.createLineageVertex(this.tableName, LINEAGE_NAMESPACE);
        }

        public void close() throws IOException {
        }
    }

    static class RetractingSinkFunction
    extends AbstractExactlyOnceSink {
        private static final String LINEAGE_NAMESPACE = "values://RetractingSinkFunction";
        private static final long serialVersionUID = 1L;
        protected transient ListState<Row> retractResultState;
        protected transient List<Row> localRetractResult;

        protected RetractingSinkFunction(String tableName, DataType consumedDataType, DynamicTableSink.DataStructureConverter converter) {
            super(tableName, consumedDataType, converter);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            super.initializeState(context);
            this.retractResultState = context.getOperatorStateStore().getListState(new ListStateDescriptor("sink-retract-results", (TypeSerializer)ExternalSerializer.of((DataType)this.consumedDataType)));
            this.localRetractResult = new ArrayList<Row>();
            if (context.isRestored()) {
                for (Row value : (Iterable)this.retractResultState.get()) {
                    this.localRetractResult.add(value);
                }
            }
            int taskId = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            Object object = LOCK;
            synchronized (object) {
                globalRetractResult.computeIfAbsent(this.tableName, k -> new HashMap()).put(taskId, this.localRetractResult);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            super.snapshotState(context);
            Object object = LOCK;
            synchronized (object) {
                this.retractResultState.update(this.localRetractResult);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(RowData value, SinkFunction.Context context) throws Exception {
            RowKind kind = value.getRowKind();
            Row row = (Row)this.converter.toExternal((Object)value);
            Assertions.assertThat((Object)row).isNotNull();
            Object object = LOCK;
            synchronized (object) {
                Row retractRow = Row.copy((Row)row);
                retractRow.setKind(RowKind.INSERT);
                if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
                    this.localRetractResult.add(retractRow);
                } else {
                    boolean contains = this.localRetractResult.remove(retractRow);
                    if (!contains) {
                        throw new RuntimeException("Tried to retract a value that wasn't inserted first. This is probably an incorrectly implemented test.");
                    }
                }
                this.addLocalRawResult(row);
            }
        }

        @Override
        public String getNamespace() {
            return LINEAGE_NAMESPACE;
        }
    }

    static class KeyedUpsertingSinkFunction
    extends AbstractExactlyOnceSink {
        private static final String LINEAGE_NAMESPACE = "values://KeyedUpsertingSinkFunction";
        private static final long serialVersionUID = 1L;
        private final int[] keyIndices;
        private final int[] targetColumnIndices;
        private final int expectedSize;
        private final int totalColumns;
        private transient Map<Row, Row> localUpsertResult;
        private transient int receivedNum;

        protected KeyedUpsertingSinkFunction(String tableName, DataType consumedDataType, DynamicTableSink.DataStructureConverter converter, int[] keyIndices, int[] targetColumnIndices, int expectedSize, int totalColumns) {
            super(tableName, consumedDataType, converter);
            this.keyIndices = keyIndices;
            this.targetColumnIndices = targetColumnIndices;
            this.expectedSize = expectedSize;
            this.totalColumns = totalColumns;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            super.initializeState(context);
            Object object = LOCK;
            synchronized (object) {
                this.localUpsertResult = globalUpsertResult.computeIfAbsent(this.tableName, k -> new HashMap()).computeIfAbsent(0, k -> new HashMap());
                globalRawResult.computeIfAbsent(this.tableName, k -> new HashMap()).values().stream().flatMap(Collection::stream).forEach(row -> {
                    boolean isDelete = row.getKind() == RowKind.DELETE || row.getKind() == RowKind.UPDATE_BEFORE;
                    Row key = Row.project((Row)row, (int[])this.keyIndices);
                    key.setKind(RowKind.INSERT);
                    if (isDelete) {
                        this.localUpsertResult.remove(key);
                    } else {
                        Row upsertRow = Row.copy((Row)row);
                        upsertRow.setKind(RowKind.INSERT);
                        this.localUpsertResult.put(key, upsertRow);
                    }
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(RowData value, SinkFunction.Context context) throws Exception {
            RowKind kind = value.getRowKind();
            Row row = (Row)this.converter.toExternal((Object)value);
            Assertions.assertThat((Object)row).isNotNull();
            Object object = LOCK;
            synchronized (object) {
                Row key = Row.project((Row)row, (int[])this.keyIndices);
                key.setKind(RowKind.INSERT);
                Row upsertRow = Row.copy((Row)row);
                upsertRow.setKind(RowKind.INSERT);
                if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
                    if (this.targetColumnIndices.length > 0) {
                        this.localUpsertResult.compute(key, (entryKey, currentValue) -> this.updateRowValue((Row)currentValue, upsertRow, this.targetColumnIndices));
                    } else {
                        this.localUpsertResult.put(key, upsertRow);
                    }
                } else {
                    Row oldValue = this.localUpsertResult.remove(key);
                    if (oldValue == null) {
                        throw new RuntimeException("Tried to delete a value that wasn't inserted first. This is probably an incorrectly implemented test.");
                    }
                }
                this.addLocalRawResult(row);
                ++this.receivedNum;
                if (this.expectedSize != -1 && this.receivedNum == this.expectedSize) {
                    throw new SuccessException();
                }
            }
        }

        private Row updateRowValue(Row old, Row newRow, int[] targetColumnIndices) {
            if (old == null) {
                return newRow;
            }
            assert (old.getArity() == this.totalColumns);
            for (int idx : targetColumnIndices) {
                old.setField(idx, newRow.getField(idx));
            }
            return old;
        }

        @Override
        public String getNamespace() {
            return LINEAGE_NAMESPACE;
        }
    }

    static class AppendingSinkFunction
    extends AbstractExactlyOnceSink {
        private static final String LINEAGE_NAMESPACE = "values://AppendingSinkFunction";
        private static final long serialVersionUID = 1L;
        private final int rowtimeIndex;

        protected AppendingSinkFunction(String tableName, DataType consumedDataType, DynamicTableSink.DataStructureConverter converter, int rowtimeIndex) {
            super(tableName, consumedDataType, converter);
            this.rowtimeIndex = rowtimeIndex;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(RowData value, SinkFunction.Context context) throws Exception {
            if (value.getRowKind() == RowKind.INSERT) {
                if (this.rowtimeIndex >= 0) {
                    TimestampData rowtime = value.getTimestamp(this.rowtimeIndex, 3);
                    long mark = context.currentWatermark();
                    if (mark > rowtime.getMillisecond()) {
                        return;
                    }
                }
                Object object = LOCK;
                synchronized (object) {
                    this.addLocalRawResult((Row)this.converter.toExternal((Object)value));
                }
            } else {
                throw new RuntimeException("AppendingSinkFunction received " + value.getRowKind() + " messages.");
            }
        }

        @Override
        public String getNamespace() {
            return LINEAGE_NAMESPACE;
        }
    }

    private static abstract class AbstractExactlyOnceSink
    extends RichSinkFunction<RowData>
    implements CheckpointedFunction,
    LineageVertexProvider {
        private static final long serialVersionUID = 1L;
        protected final String tableName;
        protected final DataType consumedDataType;
        protected final DynamicTableSink.DataStructureConverter converter;
        protected transient ListState<Row> rawResultState;
        protected transient List<Row> localRawResult;

        protected AbstractExactlyOnceSink(String tableName, DataType consumedDataType, DynamicTableSink.DataStructureConverter converter) {
            this.tableName = tableName;
            this.consumedDataType = consumedDataType;
            this.converter = converter;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.rawResultState = context.getOperatorStateStore().getListState(new ListStateDescriptor("sink-results", (TypeSerializer)ExternalSerializer.of((DataType)this.consumedDataType)));
            this.localRawResult = new ArrayList<Row>();
            if (context.isRestored()) {
                for (Row value : (Iterable)this.rawResultState.get()) {
                    this.localRawResult.add(value);
                }
            }
            int taskId = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            Object object = LOCK;
            synchronized (object) {
                globalRawResult.computeIfAbsent(this.tableName, k -> new HashMap()).put(taskId, this.localRawResult);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            Object object = LOCK;
            synchronized (object) {
                this.rawResultState.update(this.localRawResult);
            }
        }

        public LineageVertex getLineageVertex() {
            return TestValuesRuntimeFunctions.createLineageVertex(this.tableName, this.getNamespace());
        }

        abstract String getNamespace();

        protected void addLocalRawResult(Row row) {
            this.localRawResult.add(row);
            Optional.ofNullable(localRawResultsObservers.get(this.tableName)).orElse(Collections.emptyList()).forEach(c -> c.accept(this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.localRawResult));
        }
    }

    public static class FromElementSourceFunctionWithWatermark
    implements SourceFunction<RowData>,
    LineageVertexProvider {
        private static final String LINEAGE_NAMESPACE = "values://FromElementSourceFunctionWithWatermark";
        private final TypeSerializer<RowData> serializer;
        private final byte[] elementsSerialized;
        private final int numElements;
        private volatile int numElementsEmitted;
        private final WatermarkStrategy<RowData> watermarkStrategy;
        private volatile boolean isRunning = true;
        private final String tableName;

        public FromElementSourceFunctionWithWatermark(String tableName, TypeSerializer<RowData> serializer, Iterable<RowData> elements, WatermarkStrategy<RowData> watermarkStrategy) throws IOException {
            this.tableName = tableName;
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper((OutputStream)baos);
            int count = 0;
            try {
                for (RowData element : elements) {
                    serializer.serialize((Object)element, (DataOutputView)wrapper);
                    ++count;
                }
            }
            catch (Exception e) {
                throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
            }
            this.numElements = count;
            this.elementsSerialized = baos.toByteArray();
            this.watermarkStrategy = watermarkStrategy;
            this.serializer = serializer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
            ByteArrayInputStream bais = new ByteArrayInputStream(this.elementsSerialized);
            DataInputViewStreamWrapper input = new DataInputViewStreamWrapper((InputStream)bais);
            WatermarkGenerator generator = this.watermarkStrategy.createWatermarkGenerator(new WatermarkGeneratorSupplier.Context(){

                public MetricGroup getMetricGroup() {
                    return null;
                }

                public RelativeClock getInputActivityClock() {
                    return SystemClock.getInstance();
                }
            });
            TestValuesWatermarkOutput output = new TestValuesWatermarkOutput(ctx);
            Object lock = ctx.getCheckpointLock();
            while (this.isRunning && this.numElementsEmitted < this.numElements) {
                RowData next;
                try {
                    next = (RowData)this.serializer.deserialize((DataInputView)input);
                    generator.onEvent((Object)next, Long.MIN_VALUE, (WatermarkOutput)output);
                    generator.onPeriodicEmit((WatermarkOutput)output);
                }
                catch (Exception e) {
                    throw new IOException("Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.\nSerializer is " + this.serializer, e);
                }
                Object object = lock;
                synchronized (object) {
                    ctx.collect((Object)next);
                    ++this.numElementsEmitted;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public LineageVertex getLineageVertex() {
            return new SourceLineageVertex(){

                public Boundedness boundedness() {
                    return Boundedness.BOUNDED;
                }

                public List<LineageDataset> datasets() {
                    return Arrays.asList(new DefaultLineageDataset(tableName, FromElementSourceFunctionWithWatermark.LINEAGE_NAMESPACE, new HashMap()));
                }
            };
        }

        private class TestValuesWatermarkOutput
        implements WatermarkOutput {
            SourceFunction.SourceContext<RowData> ctx;

            public TestValuesWatermarkOutput(SourceFunction.SourceContext<RowData> ctx) {
                this.ctx = ctx;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void emitWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
                this.ctx.emitWatermark(new Watermark(watermark.getTimestamp()));
                Object object = LOCK;
                synchronized (object) {
                    watermarkHistory.computeIfAbsent(FromElementSourceFunctionWithWatermark.this.tableName, k -> new LinkedList()).add(watermark);
                }
            }

            public void markIdle() {
            }

            public void markActive() {
            }
        }
    }
}

