/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.io.Serializable;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CoStreamFlatMapTest
implements Serializable {
    private static final long serialVersionUID = 1L;

    CoStreamFlatMapTest() {
    }

    @Test
    void testCoFlatMap() throws Exception {
        CoStreamFlatMap operator = new CoStreamFlatMap((CoFlatMapFunction)new MyCoFlatMap());
        TwoInputStreamOperatorTestHarness testHarness = new TwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)"abc", initialTime + 1L));
        testHarness.processElement1(new StreamRecord((Object)"def", initialTime + 2L));
        testHarness.processWatermark1(new Watermark(initialTime + 2L));
        testHarness.processElement1(new StreamRecord((Object)"ghi", initialTime + 3L));
        testHarness.processElement2(new StreamRecord((Object)1, initialTime + 1L));
        testHarness.processElement2(new StreamRecord((Object)2, initialTime + 2L));
        testHarness.processWatermark2(new Watermark(initialTime + 3L));
        testHarness.processElement2(new StreamRecord((Object)3, initialTime + 3L));
        testHarness.processElement2(new StreamRecord((Object)4, initialTime + 4L));
        testHarness.processElement2(new StreamRecord((Object)5, initialTime + 5L));
        expectedOutput.add(new StreamRecord((Object)"a", initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)"b", initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)"c", initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)"d", initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)"e", initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)"f", initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)"g", initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)"h", initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)"i", initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)"1", initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)"2", initialTime + 2L));
        expectedOutput.add(new Watermark(initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)"3", initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)"4", initialTime + 4L));
        expectedOutput.add(new StreamRecord((Object)"5", initialTime + 5L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
    }

    @Test
    void testOpenClose() throws Exception {
        CoStreamFlatMap operator = new CoStreamFlatMap((CoFlatMapFunction)new TestOpenCloseCoFlatMapFunction());
        TwoInputStreamOperatorTestHarness testHarness = new TwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator);
        long initialTime = 0L;
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)"Hello", initialTime));
        testHarness.processElement2(new StreamRecord((Object)42, initialTime));
        testHarness.close();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)TestOpenCloseCoFlatMapFunction.closeCalled).as("RichFunction methods where not called.", new Object[0])).isTrue();
        Assertions.assertThat((Collection)testHarness.getOutput()).isNotEmpty();
    }

    private static final class MyCoFlatMap
    implements CoFlatMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;

        private MyCoFlatMap() {
        }

        public void flatMap1(String value, Collector<String> coll) {
            for (int i = 0; i < value.length(); ++i) {
                coll.collect((Object)value.substring(i, i + 1));
            }
        }

        public void flatMap2(Integer value, Collector<String> coll) {
            coll.collect((Object)value.toString());
        }
    }

    private static class TestOpenCloseCoFlatMapFunction
    extends RichCoFlatMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseCoFlatMapFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)closeCalled).as("Close called before open.", new Object[0])).isFalse();
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Open was not called before close.", new Object[0])).isTrue();
            closeCalled = true;
        }

        public void flatMap1(String value, Collector<String> out) throws Exception {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Open was not called before run.", new Object[0])).isTrue();
            out.collect((Object)value);
        }

        public void flatMap2(Integer value, Collector<String> out) throws Exception {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Open was not called before run.", new Object[0])).isTrue();
            out.collect((Object)value.toString());
        }
    }
}

