/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions.table.fullcache.inputformat;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.source.lookup.cache.InterceptingCacheMetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.functions.table.fullcache.TestCacheLoader;
import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class InputFormatCacheLoaderTest {
    private static final int DEFAULT_NUM_SPLITS = 2;
    private static final int DEFAULT_DELTA_NUM_SPLITS = 0;

    InputFormatCacheLoaderTest() {
    }

    @BeforeEach
    void resetCounter() {
        FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0);
    }

    @AfterEach
    void checkCounter() {
        Assertions.assertThat((AtomicInteger)FullCacheTestInputFormat.OPEN_CLOSED_COUNTER).hasValue(0);
    }

    @ParameterizedTest
    @MethodSource(value={"deltaNumSplits"})
    void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
        ConcurrentHashMap cache;
        try (InputFormatCacheLoader cacheLoader = this.createCacheLoader(deltaNumSplits);){
            cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup());
            this.reloadSynchronously((CacheLoader)cacheLoader);
            cache = cacheLoader.getCache();
            this.assertCacheContent(cache);
            this.reloadSynchronously((CacheLoader)cacheLoader);
            ((MapAssert)Assertions.assertThat((Map)cacheLoader.getCache()).as("A new instance of cache should be present after reload.", new Object[0])).isNotSameAs((Object)cache);
            cache = cacheLoader.getCache();
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)cache.size()).as("Cache should be cleared after close.", new Object[0])).isZero();
    }

    @Test
    void testCacheMetrics() throws Exception {
        try (InputFormatCacheLoader cacheLoader = this.createCacheLoader(0);){
            InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
            cacheLoader.initializeMetrics((CacheMetricGroup)metricGroup);
            Assertions.assertThat((Object)metricGroup.loadCounter).isNotNull();
            Assertions.assertThat((long)metricGroup.loadCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat((Object)metricGroup.numLoadFailuresCounter).isNotNull();
            Assertions.assertThat((long)metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat((Object)metricGroup.numCachedRecordsGauge).isNotNull();
            Assertions.assertThat((Long)((Long)metricGroup.numCachedRecordsGauge.getValue())).isEqualTo(0L);
            Assertions.assertThat((Object)metricGroup.latestLoadTimeGauge).isNotNull();
            Assertions.assertThat((Long)((Long)metricGroup.latestLoadTimeGauge.getValue())).isEqualTo(-1L);
            Assertions.assertThat((Object)metricGroup.hitCounter).isNull();
            Assertions.assertThat((Object)metricGroup.missCounter).isNull();
            Assertions.assertThat((Object)metricGroup.numCachedBytesGauge).isNull();
            this.reloadSynchronously((CacheLoader)cacheLoader);
            Assertions.assertThat((long)metricGroup.loadCounter.getCount()).isEqualTo(1L);
            Assertions.assertThat((Long)((Long)metricGroup.latestLoadTimeGauge.getValue())).isNotEqualTo(-1L);
            Assertions.assertThat((Long)((Long)metricGroup.numCachedRecordsGauge.getValue())).isEqualTo((long)TestCacheLoader.DATA.size());
        }
    }

    @Test
    void testExceptionDuringReload() throws Exception {
        RuntimeException exception = new RuntimeException("Load failed.");
        Runnable reloadAction = () -> {
            throw exception;
        };
        try (InputFormatCacheLoader cacheLoader = this.createCacheLoader(2, 0, reloadAction);){
            InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
            cacheLoader.initializeMetrics((CacheMetricGroup)metricGroup);
            Assertions.assertThatThrownBy(() -> this.reloadSynchronously((CacheLoader)cacheLoader)).hasRootCause((Throwable)exception);
            Assertions.assertThat((long)metricGroup.loadCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat((long)metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1L);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"numSplits"})
    void testCloseDuringReload(int numSplits) throws Exception {
        CompletableFuture future;
        OneShotLatch reloadLatch = new OneShotLatch();
        Runnable reloadAction = () -> {
            reloadLatch.trigger();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new OneShotLatch().await()).as("Wait should be interrupted if everything works ok", new Object[0])).isInstanceOf(InterruptedException.class);
            Thread.currentThread().interrupt();
        };
        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
        try (InputFormatCacheLoader cacheLoader = this.createCacheLoader(numSplits, 0, reloadAction);){
            cacheLoader.initializeMetrics((CacheMetricGroup)metricGroup);
            future = cacheLoader.reloadAsync();
            reloadLatch.await();
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)future.isDone()).as("The reload future should still complete successfully indicating that the reload was intentionally stopped without an error.", new Object[0])).isTrue();
        Assertions.assertThat((long)metricGroup.loadCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat((long)metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
    }

    static Stream<Arguments> numSplits() {
        return Stream.of(Arguments.of((Object[])new Object[]{1}), Arguments.of((Object[])new Object[]{2}));
    }

    static Stream<Arguments> deltaNumSplits() {
        return Stream.of(Arguments.of((Object[])new Object[]{-1}), Arguments.of((Object[])new Object[]{0}), Arguments.of((Object[])new Object[]{1}));
    }

    private void assertCacheContent(Map<RowData, Collection<RowData>> actual) {
        Assertions.assertThat(actual).containsOnlyKeys(TestCacheLoader.DATA.keySet());
        TestCacheLoader.DATA.forEach((key, rows) -> Assertions.assertThat((Collection)rows).containsExactlyInAnyOrderElementsOf((Iterable)actual.get(key)));
    }

    private void reloadSynchronously(CacheLoader cacheLoader) {
        cacheLoader.reloadAsync().join();
    }

    private InputFormatCacheLoader createCacheLoader(int deltaNumSplits) throws Exception {
        return this.createCacheLoader(2, deltaNumSplits, () -> {});
    }

    private InputFormatCacheLoader createCacheLoader(int numSplits, int deltaNumSplits, final Runnable reloadAction) throws Exception {
        DataType rightRowDataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)((DataType)DataTypes.STRING().bridgedTo(String.class)))});
        RowDataSerializer rightRowSerializer = (RowDataSerializer)InternalSerializers.create((LogicalType)rightRowDataType.getLogicalType());
        DataType[] dataTypes = rightRowDataType.getChildren().toArray(new DataType[0]);
        DataFormatConverters.RowConverter converter = new DataFormatConverters.RowConverter(dataTypes);
        Collection dataRows = TestCacheLoader.DATA.values().stream().map(Collection::stream).reduce(Stream.empty(), Stream::concat).map(arg_0 -> ((DataFormatConverters.RowConverter)converter).toExternal(arg_0)).collect(Collectors.toList());
        FullCacheTestInputFormat inputFormat = new FullCacheTestInputFormat(dataRows, Optional.empty(), converter, numSplits, deltaNumSplits);
        RowType keyType = (RowType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.INT()}).getLogicalType();
        GeneratedProjection generatedProjection = new GeneratedProjection("", "", new Object[0]){

            public Projection newInstance(ClassLoader classLoader) {
                return row -> {
                    reloadAction.run();
                    return StreamRecordUtils.row(row.getInt(0));
                };
            }
        };
        GenericRowDataKeySelector keySelector = new GenericRowDataKeySelector(InternalTypeInfo.of((RowType)keyType), InternalSerializers.create((RowType)keyType), generatedProjection);
        InputFormatCacheLoader cacheLoader = new InputFormatCacheLoader((InputFormat)inputFormat, keySelector, rightRowSerializer);
        cacheLoader.open(new Configuration(), Thread.currentThread().getContextClassLoader());
        return cacheLoader;
    }
}

