/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.logical;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleBase;
import org.apache.flink.table.planner.plan.utils.PythonUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;

public class PushWatermarkIntoTableSourceScanAcrossCalcRule
extends PushWatermarkIntoTableSourceScanRuleBase {
    public static final PushWatermarkIntoTableSourceScanAcrossCalcRule INSTANCE = new PushWatermarkIntoTableSourceScanAcrossCalcRule();

    public PushWatermarkIntoTableSourceScanAcrossCalcRule() {
        super(PushWatermarkIntoTableSourceScanAcrossCalcRule.operand(FlinkLogicalWatermarkAssigner.class, PushWatermarkIntoTableSourceScanAcrossCalcRule.operand(FlinkLogicalCalc.class, PushWatermarkIntoTableSourceScanAcrossCalcRule.operand(FlinkLogicalTableSourceScan.class, PushWatermarkIntoTableSourceScanAcrossCalcRule.none()), new RelOptRuleOperand[0]), new RelOptRuleOperand[0]), "PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule");
    }

    @Override
    public boolean matches(RelOptRuleCall call) {
        FlinkLogicalTableSourceScan scan = (FlinkLogicalTableSourceScan)call.rel(2);
        FlinkLogicalCalc calc = (FlinkLogicalCalc)call.rel(1);
        return this.supportsWatermarkPushDown(scan) && calc.getProgram().getExprList().stream().noneMatch(rexNode -> PythonUtil.containsPythonCall(rexNode, null));
    }

    @Override
    public void onMatch(RelOptRuleCall call) {
        FlinkLogicalWatermarkAssigner watermarkAssigner = (FlinkLogicalWatermarkAssigner)call.rel(0);
        FlinkLogicalCalc calc = (FlinkLogicalCalc)call.rel(1);
        RexProgram originProgram = calc.getProgram();
        final List projectList = originProgram.getProjectList().stream().map(originProgram::expandLocalRef).collect(Collectors.toList());
        RexNode rowTimeColumn = (RexNode)projectList.get(watermarkAssigner.rowtimeFieldIndex());
        RexNode newWatermarkExpr = watermarkAssigner.watermarkExpr().accept(new RexShuttle(){

            @Override
            public RexNode visitInputRef(RexInputRef inputRef) {
                return (RexNode)projectList.get(inputRef.getIndex());
            }
        });
        FlinkLogicalTableSourceScan newScan = this.getNewScan(watermarkAssigner, newWatermarkExpr, (FlinkLogicalTableSourceScan)call.rel(2), ShortcutUtils.unwrapContext(calc).getTableConfig(), false);
        FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(calc);
        RexBuilder builder = call.builder().getRexBuilder();
        RexNode newRowTimeColumn = builder.makeReinterpretCast(typeFactory.createRowtimeIndicatorType(rowTimeColumn.getType().isNullable(), rowTimeColumn.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE), rowTimeColumn, null);
        RexProgramBuilder programBuilder = new RexProgramBuilder(newScan.getRowType(), builder);
        List<String> outputFieldNames = originProgram.getOutputRowType().getFieldNames();
        for (int i = 0; i < projectList.size(); ++i) {
            if (i == watermarkAssigner.rowtimeFieldIndex()) {
                programBuilder.addProject(newRowTimeColumn, outputFieldNames.get(i));
                continue;
            }
            programBuilder.addProject((RexNode)projectList.get(i), outputFieldNames.get(i));
        }
        if (originProgram.getCondition() != null) {
            programBuilder.addCondition(originProgram.expandLocalRef(originProgram.getCondition()));
        }
        FlinkLogicalCalc newCalc = FlinkLogicalCalc.create(newScan, programBuilder.getProgram());
        call.transformTo(newCalc);
    }
}

