/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.stream;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback;
import org.apache.flink.table.runtime.operators.bundle.trigger.CoBundleTrigger;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.bundle.BufferBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasNoUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.JoinKeyContainsUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.metrics.SimpleGauge;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

public abstract class MiniBatchStreamingJoinOperator
extends StreamingJoinOperator
implements BundleTriggerCallback {
    private static final long serialVersionUID = -1106342589994963997L;
    private final CoBundleTrigger<RowData, RowData> coBundleTrigger;
    private transient BufferBundle<?> leftBuffer;
    private transient BufferBundle<?> rightBuffer;
    private transient SimpleGauge<Integer> leftBundleReducedSizeGauge;
    private transient SimpleGauge<Integer> rightBundleReducedSizeGauge;
    private transient TypeSerializer<RowData> leftSerializer;
    private transient TypeSerializer<RowData> rightSerializer;

    public MiniBatchStreamingJoinOperator(MiniBatchStreamingJoinParameter parameter) {
        super(parameter.leftType, parameter.rightType, parameter.generatedJoinCondition, parameter.leftInputSideSpec, parameter.rightInputSideSpec, parameter.leftIsOuter, parameter.rightIsOuter, parameter.filterNullKeys, parameter.leftStateRetentionTime, parameter.rightStateRetentionTime);
        this.coBundleTrigger = parameter.coBundleTrigger;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.leftBuffer = this.initialBuffer(this.leftInputSideSpec);
        this.rightBuffer = this.initialBuffer(this.rightInputSideSpec);
        this.coBundleTrigger.registerCallback(this);
        this.coBundleTrigger.reset();
        LOG.info("Initialize MiniBatchStreamingJoinOperator successfully.");
        this.leftSerializer = this.leftType.createSerializer(this.getExecutionConfig().getSerializerConfig());
        this.rightSerializer = this.rightType.createSerializer(this.getExecutionConfig().getSerializerConfig());
        this.leftBundleReducedSizeGauge = new SimpleGauge<Integer>(0);
        this.rightBundleReducedSizeGauge = new SimpleGauge<Integer>(0);
        this.getRuntimeContext().getMetricGroup().gauge("leftBundleReducedSize", this.leftBundleReducedSizeGauge);
        this.getRuntimeContext().getMetricGroup().gauge("rightBundleReducedSize", this.rightBundleReducedSizeGauge);
    }

    @Override
    public void processElement1(StreamRecord<RowData> element) throws Exception {
        RowData record = (RowData)this.leftSerializer.copy((Object)((RowData)element.getValue()));
        RowData joinKey = (RowData)this.getCurrentKey();
        RowData uniqueKey = null;
        if (this.leftInputSideSpec.getUniqueKeySelector() != null) {
            uniqueKey = (RowData)this.leftInputSideSpec.getUniqueKeySelector().getKey((Object)record);
        }
        this.leftBuffer.addRecord(joinKey, uniqueKey, record);
        this.coBundleTrigger.onElement1(record);
    }

    @Override
    public void processElement2(StreamRecord<RowData> element) throws Exception {
        RowData record = (RowData)this.rightSerializer.copy((Object)((RowData)element.getValue()));
        RowData joinKey = (RowData)this.getCurrentKey();
        RowData uniqueKey = null;
        if (this.rightInputSideSpec.getUniqueKeySelector() != null) {
            uniqueKey = (RowData)this.rightInputSideSpec.getUniqueKeySelector().getKey((Object)record);
        }
        this.rightBuffer.addRecord(joinKey, uniqueKey, record);
        this.coBundleTrigger.onElement2(record);
    }

    public void processWatermark1(Watermark mark) throws Exception {
        this.finishBundle();
        super.processWatermark1(mark);
    }

    public void processWatermark2(Watermark mark) throws Exception {
        this.finishBundle();
        super.processWatermark2(mark);
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.finishBundle();
    }

    public void finish() throws Exception {
        this.finishBundle();
        super.finish();
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.leftBuffer.clear();
        this.rightBuffer.clear();
    }

    @Override
    public void finishBundle() throws Exception {
        if (!this.leftBuffer.isEmpty() || !this.rightBuffer.isEmpty()) {
            this.leftBundleReducedSizeGauge.update(this.leftBuffer.reducedSize());
            this.rightBundleReducedSizeGauge.update(this.rightBuffer.reducedSize());
            this.processBundles(this.leftBuffer, this.rightBuffer);
            this.leftBuffer.clear();
            this.rightBuffer.clear();
        }
        this.coBundleTrigger.reset();
    }

    protected abstract void processBundles(BufferBundle<?> var1, BufferBundle<?> var2) throws Exception;

    private BufferBundle<?> initialBuffer(JoinInputSideSpec inputSideSpec) {
        if (inputSideSpec.joinKeyContainsUniqueKey()) {
            return new JoinKeyContainsUniqueKeyBundle();
        }
        if (inputSideSpec.hasUniqueKey()) {
            return new InputSideHasUniqueKeyBundle();
        }
        return new InputSideHasNoUniqueKeyBundle();
    }

    private void processElementWithSuppress(Iterator<RowData> iter, JoinRecordStateView inputSideStateView, JoinRecordStateView otherSideStateView, boolean inputIsLeft) throws Exception {
        RowData pre = null;
        while (iter.hasNext()) {
            RowData current = iter.next();
            boolean isSuppress = false;
            if (RowDataUtil.isRetractMsg(current) && iter.hasNext()) {
                RowData next = iter.next();
                if (RowDataUtil.isAccumulateMsg(next)) {
                    isSuppress = true;
                } else {
                    pre = next;
                }
                this.processElement(current, inputSideStateView, otherSideStateView, inputIsLeft, isSuppress);
                if (!isSuppress) continue;
                this.processElement(next, inputSideStateView, otherSideStateView, inputIsLeft, isSuppress);
                continue;
            }
            if (pre != null) {
                if (RowDataUtil.isAccumulateMsg(current)) {
                    isSuppress = true;
                }
                this.processElement(pre, inputSideStateView, otherSideStateView, inputIsLeft, isSuppress);
                pre = null;
            }
            this.processElement(current, inputSideStateView, otherSideStateView, inputIsLeft, isSuppress);
        }
    }

    protected void processSingleSideBundles(BufferBundle<?> inputBuffer, JoinRecordStateView inputSideStateView, JoinRecordStateView otherSideStateView, boolean inputIsLeft) throws Exception {
        block6: {
            block7: {
                block5: {
                    if (!(inputBuffer instanceof InputSideHasNoUniqueKeyBundle)) break block5;
                    for (Map.Entry<RowData, List<RowData>> entry : inputBuffer.getRecords().entrySet()) {
                        this.setCurrentKey(entry.getKey());
                        for (RowData rowData : entry.getValue()) {
                            this.processElement(rowData, inputSideStateView, otherSideStateView, inputIsLeft, false);
                        }
                    }
                    break block6;
                }
                if (!(inputBuffer instanceof JoinKeyContainsUniqueKeyBundle)) break block7;
                for (Map.Entry<RowData, List<RowData>> entry : inputBuffer.getRecords().entrySet()) {
                    this.setCurrentKey(entry.getKey());
                    Iterator<RowData> iter = entry.getValue().iterator();
                    this.processElementWithSuppress(iter, inputSideStateView, otherSideStateView, inputIsLeft);
                }
                break block6;
            }
            if (!(inputBuffer instanceof InputSideHasUniqueKeyBundle)) break block6;
            for (RowData joinKey : inputBuffer.getJoinKeys()) {
                this.setCurrentKey(joinKey);
                for (Map.Entry<RowData, List<RowData>> entry : inputBuffer.getRecordsWithJoinKey(joinKey).entrySet()) {
                    Iterator<RowData> iter = entry.getValue().iterator();
                    this.processElementWithSuppress(iter, inputSideStateView, otherSideStateView, inputIsLeft);
                }
            }
        }
    }

    public static MiniBatchStreamingJoinOperator newMiniBatchStreamJoinOperator(FlinkJoinType joinType, InternalTypeInfo<RowData> leftType, InternalTypeInfo<RowData> rightType, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec leftInputSideSpec, JoinInputSideSpec rightInputSideSpec, boolean leftIsOuter, boolean rightIsOuter, boolean[] filterNullKeys, long leftStateRetentionTime, long rightStateRetentionTime, CoBundleTrigger<RowData, RowData> coBundleTrigger) {
        MiniBatchStreamingJoinParameter parameter = new MiniBatchStreamingJoinParameter(leftType, rightType, generatedJoinCondition, leftInputSideSpec, rightInputSideSpec, leftIsOuter, rightIsOuter, filterNullKeys, leftStateRetentionTime, rightStateRetentionTime, coBundleTrigger);
        switch (joinType) {
            case INNER: {
                return new MiniBatchInnerJoinStreamOperator(parameter);
            }
            case LEFT: {
                return new MiniBatchLeftOuterJoinStreamOperator(parameter);
            }
            case RIGHT: {
                return new MiniBatchRightOuterJoinStreamOperator(parameter);
            }
            case FULL: {
                return new MiniBatchFullOuterJoinStreamOperator(parameter);
            }
        }
        throw new UnsupportedOperationException("Unsupported join type: " + joinType);
    }

    static class MiniBatchStreamingJoinParameter
    implements Serializable {
        InternalTypeInfo<RowData> leftType;
        InternalTypeInfo<RowData> rightType;
        GeneratedJoinCondition generatedJoinCondition;
        JoinInputSideSpec leftInputSideSpec;
        JoinInputSideSpec rightInputSideSpec;
        boolean leftIsOuter;
        boolean rightIsOuter;
        boolean[] filterNullKeys;
        long leftStateRetentionTime;
        long rightStateRetentionTime;
        CoBundleTrigger<RowData, RowData> coBundleTrigger;

        MiniBatchStreamingJoinParameter(InternalTypeInfo<RowData> leftType, InternalTypeInfo<RowData> rightType, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec leftInputSideSpec, JoinInputSideSpec rightInputSideSpec, boolean leftIsOuter, boolean rightIsOuter, boolean[] filterNullKeys, long leftStateRetentionTime, long rightStateRetentionTime, CoBundleTrigger<RowData, RowData> coBundleTrigger) {
            this.leftType = leftType;
            this.rightType = rightType;
            this.generatedJoinCondition = generatedJoinCondition;
            this.leftInputSideSpec = leftInputSideSpec;
            this.rightInputSideSpec = rightInputSideSpec;
            this.leftIsOuter = leftIsOuter;
            this.rightIsOuter = rightIsOuter;
            this.filterNullKeys = filterNullKeys;
            this.leftStateRetentionTime = leftStateRetentionTime;
            this.rightStateRetentionTime = rightStateRetentionTime;
            this.coBundleTrigger = coBundleTrigger;
        }
    }

    private static final class MiniBatchInnerJoinStreamOperator
    extends MiniBatchStreamingJoinOperator {
        public MiniBatchInnerJoinStreamOperator(MiniBatchStreamingJoinParameter parameter) {
            super(parameter);
        }

        @Override
        protected void processBundles(BufferBundle<?> leftBuffer, BufferBundle<?> rightBuffer) throws Exception {
            this.processSingleSideBundles(rightBuffer, this.rightRecordStateView, this.leftRecordStateView, false);
            this.processSingleSideBundles(leftBuffer, this.leftRecordStateView, this.rightRecordStateView, true);
        }
    }

    private static final class MiniBatchLeftOuterJoinStreamOperator
    extends MiniBatchStreamingJoinOperator {
        public MiniBatchLeftOuterJoinStreamOperator(MiniBatchStreamingJoinParameter parameter) {
            super(parameter);
        }

        @Override
        protected void processBundles(BufferBundle<?> leftBuffer, BufferBundle<?> rightBuffer) throws Exception {
            this.processSingleSideBundles(rightBuffer, this.rightRecordStateView, this.leftRecordStateView, false);
            this.processSingleSideBundles(leftBuffer, this.leftRecordStateView, this.rightRecordStateView, true);
        }
    }

    private static final class MiniBatchRightOuterJoinStreamOperator
    extends MiniBatchStreamingJoinOperator {
        public MiniBatchRightOuterJoinStreamOperator(MiniBatchStreamingJoinParameter parameter) {
            super(parameter);
        }

        @Override
        protected void processBundles(BufferBundle<?> leftBuffer, BufferBundle<?> rightBuffer) throws Exception {
            this.processSingleSideBundles(leftBuffer, this.leftRecordStateView, this.rightRecordStateView, true);
            this.processSingleSideBundles(rightBuffer, this.rightRecordStateView, this.leftRecordStateView, false);
        }
    }

    private static final class MiniBatchFullOuterJoinStreamOperator
    extends MiniBatchStreamingJoinOperator {
        public MiniBatchFullOuterJoinStreamOperator(MiniBatchStreamingJoinParameter parameter) {
            super(parameter);
        }

        @Override
        protected void processBundles(BufferBundle<?> leftBuffer, BufferBundle<?> rightBuffer) throws Exception {
            this.processSingleSideBundles(rightBuffer, this.rightRecordStateView, this.leftRecordStateView, false);
            this.processSingleSideBundles(leftBuffer, this.leftRecordStateView, this.rightRecordStateView, true);
        }
    }
}

