/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.examples.java.basics;

import java.time.Instant;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class TemporalJoinSQLExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        DataStreamSource currencyRate = env.fromData((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{Instant.ofEpochMilli(1000L), "USD", 0.8}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{Instant.ofEpochMilli(4000L), "USD", 0.9}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{Instant.ofEpochMilli(3000L), "USD", 1.0}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{Instant.ofEpochMilli(6000L), "USD", 1.1})});
        Table rateTable = tableEnv.fromChangelogStream((DataStream)currencyRate, Schema.newBuilder().column("f0", (AbstractDataType)DataTypes.TIMESTAMP_LTZ((int)3)).column("f1", DataTypes.STRING().notNull()).column("f2", (AbstractDataType)DataTypes.DOUBLE()).watermark("f0", "f0 - INTERVAL '2' SECONDS").primaryKey(new String[]{"f1"}).build(), ChangelogMode.upsert()).as("rate_time", new String[]{"currency_code", "euro_rate"});
        tableEnv.createTemporaryView("currency_rate", rateTable);
        DataStreamSource transaction = env.fromData((Object[])new Transaction[]{new Transaction("trx1", Instant.ofEpochMilli(1000L), "USD", 1.0), new Transaction("trx2", Instant.ofEpochMilli(2000L), "USD", 1.0), new Transaction("trx3", Instant.ofEpochMilli(3000L), "USD", 1.0), new Transaction("trx4", Instant.ofEpochMilli(4000L), "USD", 1.0)});
        Table trxTable = tableEnv.fromDataStream((DataStream)transaction, Schema.newBuilder().column("id", (AbstractDataType)DataTypes.STRING()).column("trxTime", (AbstractDataType)DataTypes.TIMESTAMP_LTZ((int)3)).column("currencyCode", (AbstractDataType)DataTypes.STRING()).column("amount", (AbstractDataType)DataTypes.DOUBLE()).watermark("trxTime", "trxTime - INTERVAL '2' SECONDS").build()).as("id", new String[]{"trx_time", "currency_code", "amount"});
        tableEnv.createTemporaryView("transaction", trxTable);
        Table result = tableEnv.sqlQuery("    SELECT\n        t.id,\n        t.trx_time,\n        c.currency_code,\n        t.amount,\n        t.amount * c.euro_rate AS total_euro\n    FROM transaction t\n    JOIN currency_rate FOR SYSTEM_TIME AS OF t.trx_time AS c\n    ON t.currency_code = c.currency_code; ");
        tableEnv.toDataStream(result, EnrichedTransaction.class).print();
        env.execute();
    }

    public static class Transaction {
        public String id;
        public Instant trxTime;
        public String currencyCode;
        public double amount;

        public Transaction() {
        }

        public Transaction(String id, Instant trxTime, String currencyCode, double amount) {
            this.id = id;
            this.trxTime = trxTime;
            this.currencyCode = currencyCode;
            this.amount = amount;
        }

        public String toString() {
            return "Transaction{id=" + this.id + ", trxTime=" + this.trxTime + ", currencyCode='" + this.currencyCode + "', amount=" + this.amount + "}";
        }
    }

    public static class EnrichedTransaction
    extends Transaction {
        public double totalEuro;

        public EnrichedTransaction() {
        }

        public EnrichedTransaction(String id, Instant trxTime, String currencyCode, double amount, double totalEuro) {
            super(id, trxTime, currencyCode, amount);
            this.totalEuro = totalEuro;
        }

        @Override
        public String toString() {
            return "EnrichedTransaction{id=" + this.id + ", trxTime=" + this.trxTime + ", currencyCode='" + this.currencyCode + "', amount=" + this.amount + ", totalEuro=" + this.totalEuro + "}";
        }
    }
}

