/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class WatermarkITCase
extends StreamingTestBase {
    WatermarkITCase() {
    }

    @Test
    void testWatermarkNotMovingBack() {
        List<Row> data = Arrays.asList(Row.of((Object[])new Object[]{1, LocalDateTime.parse("2024-01-01T00:00:00")}), Row.of((Object[])new Object[]{3, LocalDateTime.parse("2024-01-03T00:00:00")}), Row.of((Object[])new Object[]{2, LocalDateTime.parse("2024-01-02T00:00:00")}));
        String dataId = TestValuesTableFactory.registerData(data);
        String ddl = String.format("CREATE Table VirtualTable (\n  a INT,\n  c TIMESTAMP(3),\n  WATERMARK FOR c as c\n) with (\n  'connector' = 'values',\n  'bounded' = 'false',\n  'scan.watermark.emit.strategy' = 'on-periodic',\n  'enable-watermark-push-down' = 'true',\n  'disable-lookup' = 'true',\n  'data-id' = '%s'\n)\n", dataId);
        this.tEnv().executeSql(ddl);
        this.tEnv().getConfig().set(CoreOptions.DEFAULT_PARALLELISM, (Object)1);
        String query = "SELECT a, c, current_watermark(c) FROM VirtualTable order by c";
        List result = CollectionUtil.iteratorToList((Iterator)this.tEnv().executeSql(query).collect());
        List actualWatermarks = TestValuesTableFactory.getWatermarkOutput("VirtualTable").stream().map(x -> TimestampData.fromEpochMillis((long)x.getTimestamp()).toLocalDateTime().toString()).collect(Collectors.toList());
        Assertions.assertThat(actualWatermarks).containsExactly((Object[])new String[]{"2024-01-01T00:00", "2024-01-03T00:00", "2024-01-03T00:00"});
        Assertions.assertThat((List)result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, LocalDateTime.parse("2024-01-01T00:00"), LocalDateTime.parse("2024-01-01T00:00")}), Row.of((Object[])new Object[]{2, LocalDateTime.parse("2024-01-02T00:00"), LocalDateTime.parse("2024-01-03T00:00")}), Row.of((Object[])new Object[]{3, LocalDateTime.parse("2024-01-03T00:00"), LocalDateTime.parse("2024-01-03T00:00")})});
    }
}

