/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.watermark.generalized;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.watermark.BoolWatermark;
import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration;
import org.apache.flink.api.common.watermark.LongWatermark;
import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.api.common.watermark.WatermarkCombinationFunction;
import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy;
import org.apache.flink.api.common.watermark.WatermarkDeclaration;
import org.apache.flink.api.common.watermark.WatermarkDeclarations;
import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.AlignedWatermarkCombiner;
import org.apache.flink.streaming.runtime.watermark.InternalBoolWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.InternalLongWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class WatermarkCombinerTest {
    private static final String DEFAULT_WATERMARK_IDENTIFIER = "default";
    private List<Watermark> receivedWatermarks = new ArrayList<Watermark>();
    private WatermarkCombiner combiner;

    WatermarkCombinerTest() {
    }

    @BeforeEach
    public void setup() {
        this.receivedWatermarks.clear();
        this.combiner = null;
    }

    @Test
    void testAlignedWatermarkCombiner() throws Exception {
        InternalLongWatermarkDeclaration watermarkDeclaration = new InternalLongWatermarkDeclaration(DEFAULT_WATERMARK_IDENTIFIER, new WatermarkCombinationPolicy((WatermarkCombinationFunction)WatermarkCombinationFunction.NumericWatermarkCombinationFunction.MIN, true), WatermarkHandlingStrategy.FORWARD, true);
        AtomicBoolean gateNotified = new AtomicBoolean(false);
        this.combiner = new AlignedWatermarkCombiner(2, () -> gateNotified.set(true));
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(Long.valueOf(1L)), 0, new Long[0]);
        Assertions.assertThat((boolean)gateNotified.get()).isFalse();
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(Long.valueOf(1L)), 1, 1L);
        Assertions.assertThat((boolean)gateNotified.get()).isTrue();
        gateNotified.set(false);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(Long.valueOf(2L)), 1, 1L);
        Assertions.assertThat((boolean)gateNotified.get()).isFalse();
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(Long.valueOf(2L)), 0, 1L, 2L);
        Assertions.assertThat((boolean)gateNotified.get()).isTrue();
    }

    @Test
    void testLongWatermarkCombinerWaitForAllChannels() throws Exception {
        LongWatermarkDeclaration watermarkDeclaration = WatermarkDeclarations.newBuilder((String)DEFAULT_WATERMARK_IDENTIFIER).typeLong().combineFunctionMax().combineWaitForAllChannels(true).build();
        InternalLongWatermarkDeclaration internalWatermarkDeclaration = (InternalLongWatermarkDeclaration)AbstractInternalWatermarkDeclaration.from((WatermarkDeclaration)watermarkDeclaration);
        this.combiner = internalWatermarkDeclaration.createWatermarkCombiner(2, null);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(3L), 0, new Long[0]);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(2L), 0, new Long[0]);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(1L), 1, 2L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(3L), 0, 2L, 3L);
    }

    @Test
    void testLongWatermarkCombinerCombineMax() throws Exception {
        LongWatermarkDeclaration watermarkDeclaration = WatermarkDeclarations.newBuilder((String)DEFAULT_WATERMARK_IDENTIFIER).typeLong().combineFunctionMax().build();
        InternalLongWatermarkDeclaration internalWatermarkDeclaration = (InternalLongWatermarkDeclaration)AbstractInternalWatermarkDeclaration.from((WatermarkDeclaration)watermarkDeclaration);
        this.combiner = internalWatermarkDeclaration.createWatermarkCombiner(2, null);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(1L), 0, 1L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(2L), 0, 1L, 2L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(2L), 1, 1L, 2L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(3L), 1, 1L, 2L, 3L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(2L), 1, 1L, 2L, 3L, 2L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(2L), 0, 1L, 2L, 3L, 2L);
    }

    @Test
    void testLongWatermarkCombinerCombineMin() throws Exception {
        LongWatermarkDeclaration watermarkDeclaration = WatermarkDeclarations.newBuilder((String)DEFAULT_WATERMARK_IDENTIFIER).typeLong().combineFunctionMin().build();
        InternalLongWatermarkDeclaration internalWatermarkDeclaration = (InternalLongWatermarkDeclaration)AbstractInternalWatermarkDeclaration.from((WatermarkDeclaration)watermarkDeclaration);
        this.combiner = internalWatermarkDeclaration.createWatermarkCombiner(2, null);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(2L), 0, 2L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(1L), 1, 2L, 1L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(3L), 0, 2L, 1L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(1L), 1, 2L, 1L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(2L), 1, 2L, 1L, 2L);
        this.executeAndCheckCombineStepWithLongWatermark(watermarkDeclaration.newWatermark(4L), 1, 2L, 1L, 2L, 3L);
    }

    @Test
    void testBoolWatermarkCombinerWaitForAllChannels() throws Exception {
        BoolWatermarkDeclaration watermarkDeclaration = WatermarkDeclarations.newBuilder((String)DEFAULT_WATERMARK_IDENTIFIER).typeBool().combineFunctionAND().combineWaitForAllChannels(true).build();
        InternalBoolWatermarkDeclaration internalWatermarkDeclaration = (InternalBoolWatermarkDeclaration)AbstractInternalWatermarkDeclaration.from((WatermarkDeclaration)watermarkDeclaration);
        this.combiner = internalWatermarkDeclaration.createWatermarkCombiner(2, null);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(true), 0, new Boolean[0]);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(true), 0, new Boolean[0]);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(false), 1, false);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(true), 1, false, true);
    }

    @Test
    void testBoolWatermarkCombinerCombineAnd() throws Exception {
        BoolWatermarkDeclaration watermarkDeclaration = WatermarkDeclarations.newBuilder((String)DEFAULT_WATERMARK_IDENTIFIER).typeBool().combineFunctionAND().build();
        InternalBoolWatermarkDeclaration internalWatermarkDeclaration = (InternalBoolWatermarkDeclaration)AbstractInternalWatermarkDeclaration.from((WatermarkDeclaration)watermarkDeclaration);
        this.combiner = internalWatermarkDeclaration.createWatermarkCombiner(2, null);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(true), 0, true);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(false), 1, true, false);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(true), 1, true, false, true);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(true), 0, true, false, true);
    }

    @Test
    void testBoolWatermarkCombinerCombineOr() throws Exception {
        BoolWatermarkDeclaration watermarkDeclaration = WatermarkDeclarations.newBuilder((String)DEFAULT_WATERMARK_IDENTIFIER).typeBool().combineFunctionOR().build();
        InternalBoolWatermarkDeclaration internalWatermarkDeclaration = (InternalBoolWatermarkDeclaration)AbstractInternalWatermarkDeclaration.from((WatermarkDeclaration)watermarkDeclaration);
        this.combiner = internalWatermarkDeclaration.createWatermarkCombiner(2, null);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(true), 0, true);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(false), 1, true);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(true), 1, true);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(false), 1, true);
        this.executeAndCheckCombineStepWithBoolWatermark(watermarkDeclaration.newWatermark(false), 0, true, false);
    }

    private void executeAndCheckCombineStepWithLongWatermark(LongWatermark watermark, int channelIndex, Long ... expectedReceivedWatermarkValues) throws Exception {
        this.combiner.combineWatermark((Watermark)watermark, channelIndex, this.receivedWatermarks::add);
        Assertions.assertThat(this.receivedWatermarks.stream().map(w -> ((LongWatermark)w).getValue())).containsExactly((Object[])expectedReceivedWatermarkValues);
    }

    private void executeAndCheckCombineStepWithBoolWatermark(BoolWatermark watermark, int channelIndex, Boolean ... expectedReceivedWatermarkValues) throws Exception {
        this.combiner.combineWatermark((Watermark)watermark, channelIndex, this.receivedWatermarks::add);
        Assertions.assertThat(this.receivedWatermarks.stream().map(w -> ((BoolWatermark)w).getValue())).containsExactly((Object[])expectedReceivedWatermarkValues);
    }
}

