/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.time.Duration;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class ChangelogNormalizeTestPrograms {
    static final String[] SOURCE_SCHEMA = new String[]{"a VARCHAR", "b INT NOT NULL", "c VARCHAR", "PRIMARY KEY(a) NOT ENFORCED"};
    static final String[] SINK_SCHEMA = new String[]{"a VARCHAR", "b INT", "c VARCHAR"};
    static final Row[] BEFORE_DATA = new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"one", 1, "a"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"two", 2, "b"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{"one", 1, "a"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"one", 1, "aa"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"three", 3, "c"}), Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{"two", 2, "b"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{"three", 3, "c"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"three", 3, "cc"})};
    static final Row[] AFTER_DATA = new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"four", 4, "d"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"five", 5, "e"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{"four", 4, "d"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"four", 4, "dd"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"six", 6, "f"}), Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{"six", 6, "f"})};
    static final String[] BEFORE_OUTPUT = new String[]{"+I[one, 1, a]", "+I[two, 2, b]", "-U[one, 1, a]", "+U[one, 1, aa]", "+I[three, 3, c]", "-D[two, 2, b]", "-U[three, 3, c]", "+U[three, 3, cc]"};
    static final String[] AFTER_OUTPUT = new String[]{"+I[four, 4, d]", "+I[five, 5, e]", "-U[four, 4, d]", "+U[four, 4, dd]", "+I[six, 6, f]", "-D[six, 6, f]"};
    static final TableTestProgram CHANGELOG_SOURCE = TableTestProgram.of((String)"changelog-normalize-source", (String)"validates changelog normalize source").setupConfig(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)true).setupTableSource(((SourceTestStep.Builder)((SourceTestStep.Builder)SourceTestStep.newBuilder((String)"source_t").addOption("changelog-mode", "I,UA,UB,D")).addSchema(SOURCE_SCHEMA)).producedBeforeRestore(BEFORE_DATA).producedAfterRestore(AFTER_DATA).build()).setupTableSink(((SinkTestStep.Builder)SinkTestStep.newBuilder((String)"sink_t").addSchema(SINK_SCHEMA)).consumedBeforeRestore(BEFORE_OUTPUT).consumedAfterRestore(AFTER_OUTPUT).build()).runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t").build();
    static final TableTestProgram CHANGELOG_SOURCE_MINI_BATCH = TableTestProgram.of((String)"changelog-normalize-source-mini-batch", (String)"validates changelog normalize source with mini batch").setupConfig(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)true).setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, (Object)true).setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(10L)).setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, (Object)2L).setupTableSource(((SourceTestStep.Builder)((SourceTestStep.Builder)SourceTestStep.newBuilder((String)"source_t").addOption("changelog-mode", "I,UA,UB,D")).addSchema(SOURCE_SCHEMA)).producedBeforeRestore(BEFORE_DATA).producedAfterRestore(AFTER_DATA).build()).setupTableSink(((SinkTestStep.Builder)SinkTestStep.newBuilder((String)"sink_t").addSchema(SINK_SCHEMA)).consumedBeforeRestore(BEFORE_OUTPUT).consumedAfterRestore(AFTER_OUTPUT).build()).runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t").build();
    static final TableTestProgram UPSERT_SOURCE = TableTestProgram.of((String)"changelog-normalize-upsert", (String)"validates changelog normalize upsert").setupConfig(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)true).setupTableSource(((SourceTestStep.Builder)((SourceTestStep.Builder)SourceTestStep.newBuilder((String)"source_t").addOption("changelog-mode", "I,UA,D")).addSchema(SOURCE_SCHEMA)).producedBeforeRestore(new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"one", 1, "a"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"two", 2, "b"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"one", 1, "aa"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"three", 3, "c"}), Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{"two", 2, "b"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"three", 3, "cc"})}).producedAfterRestore(new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"four", 4, "d"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"five", 5, "e"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"six", 6, "f"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"five", 5, "ee"}), Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{"six", 6, "f"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"four", 4, "dd"})}).build()).setupTableSink(((SinkTestStep.Builder)SinkTestStep.newBuilder((String)"sink_t").addSchema(SINK_SCHEMA)).consumedBeforeRestore(BEFORE_OUTPUT).consumedAfterRestore(new String[]{"+I[four, 4, d]", "+I[five, 5, e]", "+I[six, 6, f]", "-U[five, 5, e]", "+U[five, 5, ee]", "-D[six, 6, f]", "-U[four, 4, d]", "+U[four, 4, dd]"}).build()).runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t").build();
}

