/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.jsonplan;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.junit.jupiter.api.Test;

class ConfigureOperatorLevelStateTtlJsonITCase
extends JsonPlanTestBase {
    ConfigureOperatorLevelStateTtlJsonITCase() {
    }

    @Test
    void testDifferentStateTtlThroughCompiledPlanForDifferentOneInputStreamOperator() throws Exception {
        this.innerTestDeduplicateAndGroupAggregate("INSERT INTO OrdersStats \nSELECT buyer, COUNT(1) AS ord_cnt, SUM(quantity) AS quantity_cnt, SUM(amount) AS total_amount FROM (\nSELECT *, ROW_NUMBER() OVER(PARTITION BY order_id, buyer, quantity, amount ORDER BY proctime() ASC) AS rk FROM Orders) tmp\nWHERE rk = 1\nGROUP BY buyer", json -> {
            try {
                JsonNode target = JsonTestUtils.readFromString(json);
                JsonTestUtils.setExecNodeStateMetadata(target, "stream-exec-deduplicate", 0, 6000L);
                JsonTestUtils.setExecNodeStateMetadata(target, "stream-exec-group-aggregate", 0, 9000L);
                return JsonTestUtils.writeToString(target);
            }
            catch (IOException e) {
                throw new TableException("Cannot modify compiled json plan.", (Throwable)e);
            }
        });
    }

    @Test
    void testDifferentStateTtlThroughSqlHintForDifferentOneInputStreamOperator() throws Exception {
        this.tableEnv.getConfig().set("table.exec.mini-batch.enabled", "true");
        this.tableEnv.getConfig().set("table.exec.mini-batch.size", "2");
        this.tableEnv.getConfig().set("table.exec.mini-batch.allow-latency", "1");
        this.innerTestDeduplicateAndGroupAggregate("INSERT INTO OrdersStats \nSELECT /*+STATE_TTL('tmp' = '9s')*/ buyer, COUNT(1) AS ord_cnt, SUM(quantity) AS quantity_cnt, SUM(amount) AS total_amount \nFROM (\n    SELECT *, ROW_NUMBER() OVER(PARTITION BY order_id, buyer, quantity, amount ORDER BY proctime() ASC) AS rk FROM Orders\n) tmp\nWHERE rk = 1\nGROUP BY buyer", json -> {
            try {
                JsonNode target = JsonTestUtils.readFromString(json);
                JsonTestUtils.setExecNodeStateMetadata(target, "stream-exec-deduplicate", 0, 6000L);
                return JsonTestUtils.writeToString(target);
            }
            catch (IOException e) {
                throw new TableException("Cannot modify compiled json plan.", (Throwable)e);
            }
        });
    }

    @Test
    void testDifferentStateTtlThroughCompiledPlanForSameTwoInputStreamOperator() throws Exception {
        this.innerTestRegularJoin("INSERT INTO OrdersShipInfo \nSELECT a.order_id, a.line_order_id, b.ship_mode FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id", json -> {
            try {
                JsonNode target = JsonTestUtils.readFromString(json);
                JsonTestUtils.setExecNodeStateMetadata(target, "stream-exec-join", 0, 3000L);
                JsonTestUtils.setExecNodeStateMetadata(target, "stream-exec-join", 1, 9000L);
                return JsonTestUtils.writeToString(target);
            }
            catch (IOException e) {
                throw new TableException("Cannot modify compiled json plan.", (Throwable)e);
            }
        });
    }

    @Test
    void testDifferentStateTtlThroughSqlHintForSameTwoInputStreamOperator() throws Exception {
        this.innerTestRegularJoin("INSERT INTO OrdersShipInfo \nSELECT /*+ STATE_TTL('a' = '3s', 'b' = '9s') */\n a.order_id, a.line_order_id, b.ship_mode FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id", json -> json);
    }

    private void innerTestDeduplicateAndGroupAggregate(String sql, Function<String, String> jsonPlanTransformer) throws Exception {
        String dataId = TestValuesTableFactory.registerRowData(Arrays.asList(GenericRowData.of((Object[])new Object[]{1, StringData.fromString((String)"Tom"), 1, 199.9}), GenericRowData.of((Object[])new Object[]{2, StringData.fromString((String)"Jerry"), 2, 99.9}), GenericRowData.of((Object[])new Object[]{1, StringData.fromString((String)"Tom"), 1, 199.9}), GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Tom"), 1, 29.9}), GenericRowData.of((Object[])new Object[]{4, StringData.fromString((String)"Olivia"), 1, 100.0}), GenericRowData.of((Object[])new Object[]{4, StringData.fromString((String)"Olivia"), 1, 100.0}), GenericRowData.of((Object[])new Object[]{2, StringData.fromString((String)"Jerry"), 2, 99.9}), GenericRowData.of((Object[])new Object[]{5, StringData.fromString((String)"Michael"), 3, 599.9}), GenericRowData.of((Object[])new Object[]{6, StringData.fromString((String)"Olivia"), 3, 1000.0})));
        this.createTestSourceTable("Orders", new String[]{"`order_id` INT", "`buyer` STRING", "`quantity` INT", "`amount` DOUBLE"}, null, ConfigureOperatorLevelStateTtlJsonITCase.getProperties(dataId, 1, "2s"));
        this.createTestNonInsertOnlyValuesSinkTable("OrdersStats", "`buyer` STRING", "`ord_cnt` BIGINT", "`quantity_cnt` BIGINT", "`total_amount` DOUBLE");
        this.compileSqlAndExecutePlan(sql, jsonPlanTransformer).await();
        List<String> expected = Arrays.asList("+I[Tom, 2, 2, 229.8]", "+I[Jerry, 1, 2, 99.9]", "+I[Jerry, 1, 2, 99.9]", "+I[Olivia, 2, 4, 1100.0]", "+I[Michael, 1, 3, 599.9]");
        this.assertResult(expected, TestValuesTableFactory.getResultsAsStrings("OrdersStats"));
    }

    private void innerTestRegularJoin(String sql, Function<String, String> jsonPlanTransformer) throws Exception {
        String leftTableDataId = TestValuesTableFactory.registerRowData(Arrays.asList(GenericRowData.of((Object[])new Object[]{1, 1000001}), GenericRowData.of((Object[])new Object[]{1, 1000002}), GenericRowData.of((Object[])new Object[]{1, 1000003}), GenericRowData.of((Object[])new Object[]{1, 1000004}), GenericRowData.of((Object[])new Object[]{1, 1000005}), GenericRowData.of((Object[])new Object[]{2, 2000001})));
        this.createTestSourceTable("Orders", new String[]{"`order_id` INT", "`line_order_id` INT"}, null, ConfigureOperatorLevelStateTtlJsonITCase.getProperties(leftTableDataId, 1, "2s"));
        String rightTableDataId = TestValuesTableFactory.registerRowData(Arrays.asList(GenericRowData.of((Object[])new Object[]{2000001, StringData.fromString((String)"TRUCK")}), GenericRowData.of((Object[])new Object[]{1000005, StringData.fromString((String)"AIR")}), GenericRowData.of((Object[])new Object[]{1000001, StringData.fromString((String)"SHIP")}), GenericRowData.of((Object[])new Object[]{1000002, StringData.fromString((String)"TRUCK")}), GenericRowData.of((Object[])new Object[]{1000003, StringData.fromString((String)"RAIL")}), GenericRowData.of((Object[])new Object[]{1000004, StringData.fromString((String)"RAIL")})));
        this.createTestSourceTable("LineOrders", new String[]{"`line_order_id` INT", "`ship_mode` STRING"}, null, ConfigureOperatorLevelStateTtlJsonITCase.getProperties(rightTableDataId, 2, "4s"));
        this.createTestValuesSinkTable("OrdersShipInfo", "`order_id` INT", "`line_order_id` INT", "`ship_mode` STRING");
        this.compileSqlAndExecutePlan(sql, jsonPlanTransformer).await();
        List<String> expected = Arrays.asList("+I[1, 1000002, TRUCK]", "+I[1, 1000004, RAIL]", "+I[1, 1000005, AIR]");
        this.assertResult(expected, TestValuesTableFactory.getResultsAsStrings("OrdersShipInfo"));
    }

    private static Map<String, String> getProperties(final String dataId, final int sleepAfterElements, final String sleepTime) {
        return new HashMap<String, String>(){
            {
                this.put("connector", "values");
                this.put("bounded", "false");
                this.put("register-internal-data", "true");
                this.put("source.sleep-after-elements", String.valueOf(sleepAfterElements));
                this.put("source.sleep-time", sleepTime);
                this.put("data-id", dataId);
            }
        };
    }
}

