/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CoBroadcastWithKeyedOperatorTest {
    private static final MapStateDescriptor<String, Integer> STATE_DESCRIPTOR = new MapStateDescriptor("broadcast-state", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);

    CoBroadcastWithKeyedOperatorTest() {
    }

    @Test
    void testKeyQuerying() throws Exception {
        class KeyQueryingProcessFunction
        extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, String> {
            KeyQueryingProcessFunction() {
            }

            public void processElement(Tuple2<Integer, String> value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                Assertions.assertThat((Integer)((Integer)ctx.getCurrentKey())).isEqualTo(value.f0);
                out.collect((Object)((String)value.f1));
            }

            public void processBroadcastElement(String value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            }
        }
        CoBroadcastWithKeyedOperator operator = new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)new KeyQueryingProcessFunction(), Collections.emptyList());
        try (KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, (KeySelector & Serializable)in -> (Integer)in.f0, null, BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement1(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5"), 12L));
            testHarness.processElement1(new StreamRecord((Object)Tuple2.of((Object)42, (Object)"42"), 13L));
            ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
            expectedOutput.add(new StreamRecord((Object)"5", 12L));
            expectedOutput.add(new StreamRecord((Object)"42", 13L));
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testAccessToKeyedStateIt() throws Exception {
        ArrayList<String> test1content = new ArrayList<String>();
        test1content.add("test1");
        test1content.add("test1");
        ArrayList<String> test2content = new ArrayList<String>();
        test2content.add("test2");
        test2content.add("test2");
        test2content.add("test2");
        test2content.add("test2");
        ArrayList<String> test3content = new ArrayList<String>();
        test3content.add("test3");
        test3content.add("test3");
        test3content.add("test3");
        HashMap<String, List<String>> expectedState = new HashMap<String, List<String>>();
        expectedState.put("test1", test1content);
        expectedState.put("test2", test2content);
        expectedState.put("test3", test3content);
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new StatefulFunctionWithKeyedStateAccessedOnBroadcast(expectedState));){
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test1", 12L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test1", 12L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test2", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test2", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test2", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test3", 14L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test3", 14L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test3", 14L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test2", 13L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)1, 13L));
        }
    }

    @Test
    void testFunctionWithTimer() throws Exception {
        String expectedKey = "6";
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new FunctionWithTimerOnKeyed(41L, "6"));){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
            testHarness.processWatermark1(new Watermark(40L));
            testHarness.processWatermark2(new Watermark(40L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 15L));
            testHarness.processWatermark1(new Watermark(50L));
            testHarness.processWatermark2(new Watermark(50L));
            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
            expectedOutput.add(new Watermark(10L));
            expectedOutput.add(new StreamRecord((Object)"BR:5 WM:10 TS:12", 12L));
            expectedOutput.add(new Watermark(40L));
            expectedOutput.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:13", 13L));
            expectedOutput.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:15", 15L));
            expectedOutput.add(new StreamRecord((Object)"TIMER:41", 41L));
            expectedOutput.add(new Watermark(50L));
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testSideOutput() throws Exception {
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new FunctionWithSideOutput());){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
            testHarness.processWatermark1(new Watermark(40L));
            testHarness.processWatermark2(new Watermark(40L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 15L));
            testHarness.processWatermark1(new Watermark(50L));
            testHarness.processWatermark2(new Watermark(50L));
            ConcurrentLinkedQueue<StreamRecord> expectedBr = new ConcurrentLinkedQueue<StreamRecord>();
            expectedBr.add(new StreamRecord((Object)"BR:5 WM:10 TS:12", 12L));
            ConcurrentLinkedQueue<StreamRecord> expectedNonBr = new ConcurrentLinkedQueue<StreamRecord>();
            expectedNonBr.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:13", 13L));
            expectedNonBr.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:15", 15L));
            TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedBr, testHarness.getSideOutput(FunctionWithSideOutput.BROADCAST_TAG));
            TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedNonBr, testHarness.getSideOutput(FunctionWithSideOutput.NON_BROADCAST_TAG));
        }
    }

    @Test
    void testFunctionWithBroadcastState() throws Exception {
        HashMap<String, Integer> expectedBroadcastState = new HashMap<String, Integer>();
        expectedBroadcastState.put("5.key", 5);
        expectedBroadcastState.put("34.key", 34);
        expectedBroadcastState.put("53.key", 53);
        expectedBroadcastState.put("12.key", 12);
        expectedBroadcastState.put("98.key", 98);
        String expectedKey = "trigger";
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new FunctionWithBroadcastState("key", expectedBroadcastState, 41L, "trigger"));){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)34, 12L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)53, 15L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)12, 16L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)98, 19L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger", 13L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)51, 21L));
            testHarness.processWatermark1(new Watermark(50L));
            testHarness.processWatermark2(new Watermark(50L));
            ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
            Assertions.assertThat(output).hasSize(3);
            Object firstRawWm = output.poll();
            Assertions.assertThat(firstRawWm).isInstanceOf(Watermark.class);
            Watermark firstWm = (Watermark)firstRawWm;
            Assertions.assertThat((long)firstWm.getTimestamp()).isEqualTo(10L);
            Object rawOutputElem = output.poll();
            Assertions.assertThat(rawOutputElem).isInstanceOf(StreamRecord.class);
            StreamRecord outputRec = (StreamRecord)rawOutputElem;
            Assertions.assertThat((Object)outputRec.getValue()).isInstanceOf(String.class);
            String outputElem = (String)outputRec.getValue();
            expectedBroadcastState.put("51.key", 51);
            ArrayList<Map.Entry<String, Integer>> expectedEntries = new ArrayList<Map.Entry<String, Integer>>();
            expectedEntries.addAll(expectedBroadcastState.entrySet());
            String expected = "TS:41 " + CoBroadcastWithKeyedOperatorTest.mapToString(expectedEntries);
            Assertions.assertThat((String)outputElem).isEqualTo(expected);
            Object secondRawWm = output.poll();
            Assertions.assertThat(secondRawWm).isInstanceOf(Watermark.class);
            Watermark secondWm = (Watermark)secondRawWm;
            Assertions.assertThat((long)secondWm.getTimestamp()).isEqualTo(50L);
        }
    }

    @Test
    void testScaleUp() throws Exception {
        OperatorSubtaskState mergedSnapshot;
        HashSet<String> keysToRegister = new HashSet<String>();
        keysToRegister.add("test1");
        keysToRegister.add("test2");
        keysToRegister.add("test3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 2, 0);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 2, 1);){
            testHarness1.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness2.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L));
        }
        HashSet<String> expected = new HashSet<String>(3);
        expected.add("test1=3");
        expected.add("test2=3");
        expected.add("test3=3");
        OperatorSubtaskState operatorSubtaskState1 = CoBroadcastWithKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 2, 3, 0);
        OperatorSubtaskState operatorSubtaskState2 = CoBroadcastWithKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 2, 3, 1);
        OperatorSubtaskState operatorSubtaskState3 = CoBroadcastWithKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 2, 3, 2);
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 0, operatorSubtaskState1);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 1, operatorSubtaskState2);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness3 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 2, operatorSubtaskState3);){
            StreamRecord rec;
            testHarness1.processElement1((StreamRecord<String>)new StreamRecord((Object)this.findValidTriggerKey(testHarness1)));
            testHarness2.processElement1((StreamRecord<String>)new StreamRecord((Object)this.findValidTriggerKey(testHarness2)));
            testHarness3.processElement1((StreamRecord<String>)new StreamRecord((Object)this.findValidTriggerKey(testHarness3)));
            ConcurrentLinkedQueue<Object> output1 = testHarness1.getOutput();
            ConcurrentLinkedQueue<Object> output2 = testHarness2.getOutput();
            ConcurrentLinkedQueue<Object> output3 = testHarness3.getOutput();
            Assertions.assertThat(output1).hasSameSizeAs(expected);
            for (Object e : output1) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
            Assertions.assertThat(output2).hasSameSizeAs(expected);
            for (Object e : output2) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
            Assertions.assertThat(output3).hasSameSizeAs(expected);
            for (Object e : output3) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
        }
    }

    @Test
    void testScaleDown() throws Exception {
        OperatorSubtaskState mergedSnapshot;
        HashSet<String> keysToRegister = new HashSet<String>();
        keysToRegister.add("test1");
        keysToRegister.add("test2");
        keysToRegister.add("test3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 0);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 1);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness3 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 2);){
            testHarness1.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness2.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness3.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L), testHarness3.snapshot(0L, 0L));
        }
        HashSet<String> expected = new HashSet<String>(3);
        expected.add("test1=3");
        expected.add("test2=3");
        expected.add("test3=3");
        OperatorSubtaskState operatorSubtaskState1 = CoBroadcastWithKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 3, 2, 0);
        OperatorSubtaskState operatorSubtaskState2 = CoBroadcastWithKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 3, 2, 1);
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 2, 0, operatorSubtaskState1);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 2, 1, operatorSubtaskState2);){
            StreamRecord rec;
            testHarness1.processElement1((StreamRecord<String>)new StreamRecord((Object)this.findValidTriggerKey(testHarness1)));
            testHarness2.processElement1((StreamRecord<String>)new StreamRecord((Object)this.findValidTriggerKey(testHarness2)));
            ConcurrentLinkedQueue<Object> output1 = testHarness1.getOutput();
            ConcurrentLinkedQueue<Object> output2 = testHarness2.getOutput();
            Assertions.assertThat(output1).hasSameSizeAs(expected);
            for (Object e : output1) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
            Assertions.assertThat(output2).hasSameSizeAs(expected);
            for (Object e : output2) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
        }
    }

    private String findValidTriggerKey(AbstractStreamOperatorTestHarness<?> harness) {
        int subtask = harness.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
        int maxParallelism = harness.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
        int parallelism = harness.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
        int element = 0;
        while (KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)Integer.toString(element), (int)maxParallelism, (int)parallelism) != subtask) {
            ++element;
        }
        return Integer.toString(element);
    }

    @Test
    void testNoKeyedStateOnBroadcastSide() throws Exception {
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new KeyedBroadcastProcessFunction<String, String, Integer, String>(){
            private static final long serialVersionUID = -1725365436500098384L;
            private final ValueStateDescriptor<String> valueState = new ValueStateDescriptor("any", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);

            public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
                ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.getRuntimeContext().getState(this.valueState).value()).isInstanceOf(NullPointerException.class)).hasMessage("No key set. This method should not be called outside of a keyed context.");
            }

            public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            }
        });){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
        }
    }

    private static <KEY, IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(TypeInformation<KEY> keyTypeInfo, KeySelector<IN1, KEY> keyKeySelector, KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function) throws Exception {
        return CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(keyTypeInfo, keyKeySelector, function, 1, 1, 0);
    }

    private static <KEY, IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(TypeInformation<KEY> keyTypeInfo, KeySelector<IN1, KEY> keyKeySelector, KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function, int maxParallelism, int numTasks, int taskIdx) throws Exception {
        return CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(keyTypeInfo, keyKeySelector, function, maxParallelism, numTasks, taskIdx, null);
    }

    private static OperatorSubtaskState repartitionInitState(OperatorSubtaskState initState, int numKeyGroups, int oldParallelism, int newParallelism, int subtaskIndex) {
        return AbstractStreamOperatorTestHarness.repartitionOperatorState(initState, numKeyGroups, oldParallelism, newParallelism, subtaskIndex);
    }

    private static <KEY, IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(TypeInformation<KEY> keyTypeInfo, KeySelector<IN1, KEY> keyKeySelector, KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function, int maxParallelism, int numTasks, int taskIdx, OperatorSubtaskState initState) throws Exception {
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Collections.singletonList(STATE_DESCRIPTOR)), keyKeySelector, null, keyTypeInfo, maxParallelism, numTasks, taskIdx);
        testHarness.setup();
        testHarness.initializeState(initState);
        testHarness.open();
        return testHarness;
    }

    private static String mapToString(List<Map.Entry<String, Integer>> entries) {
        entries.sort(Comparator.comparing(Map.Entry::getKey).thenComparingInt(Map.Entry::getValue));
        StringBuilder builder = new StringBuilder();
        for (Map.Entry<String, Integer> entry : entries) {
            builder.append(' ').append(entry.getKey()).append('=').append(entry.getValue());
        }
        return builder.toString();
    }

    private static class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private static final long serialVersionUID = 1L;

        private IdentityKeySelector() {
        }

        public T getKey(T value) throws Exception {
            return value;
        }
    }

    private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final ListStateDescriptor<String> listStateDesc = new ListStateDescriptor("listStateTest", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        private final Map<String, List<String>> expectedKeyedStates;

        StatefulFunctionWithKeyedStateAccessedOnBroadcast(Map<String, List<String>> expectedKeyedState) {
            this.expectedKeyedStates = (Map)Preconditions.checkNotNull(expectedKeyedState);
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.applyToKeyedState(this.listStateDesc, (KeyedStateFunction)new KeyedStateFunction<String, ListState<String>>(){

                public void process(String key, ListState<String> state) throws Exception {
                    Iterator it = ((Iterable)state.get()).iterator();
                    ArrayList<String> list = new ArrayList<String>();
                    while (it.hasNext()) {
                        list.add((String)it.next());
                    }
                    Assertions.assertThat(list).isEqualTo(expectedKeyedStates.get(key));
                }
            });
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            this.getRuntimeContext().getListState(this.listStateDesc).add((Object)value);
        }
    }

    private static class FunctionWithTimerOnKeyed
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final long timerTS;
        private final String expectedKey;

        FunctionWithTimerOnKeyed(long timerTS, String expectedKey) {
            this.timerTS = timerTS;
            this.expectedKey = expectedKey;
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            ctx.timerService().registerEventTimeTimer(this.timerTS);
            out.collect((Object)("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void onTimer(long timestamp, KeyedBroadcastProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((String)((String)ctx.getCurrentKey())).isEqualTo(this.expectedKey);
            out.collect((Object)("TIMER:" + timestamp));
        }
    }

    private static class FunctionWithSideOutput
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        static final OutputTag<String> BROADCAST_TAG = new OutputTag<String>("br-out"){
            private static final long serialVersionUID = -6899484480421899631L;
        };
        static final OutputTag<String> NON_BROADCAST_TAG = new OutputTag<String>("non-br-out"){
            private static final long serialVersionUID = 3837387110613831791L;
        };

        private FunctionWithSideOutput() {
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.output(BROADCAST_TAG, (Object)("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            ctx.output(NON_BROADCAST_TAG, (Object)("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }
    }

    private static class FunctionWithBroadcastState
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final String keyPostfix;
        private final Map<String, Integer> expectedBroadcastState;
        private final long timerTs;
        private final String expectedKey;

        FunctionWithBroadcastState(String keyPostfix, Map<String, Integer> expectedBroadcastState, long timerTs, String expectedKey) {
            this.keyPostfix = (String)Preconditions.checkNotNull((Object)keyPostfix);
            this.expectedBroadcastState = (Map)Preconditions.checkNotNull(expectedBroadcastState);
            this.timerTs = timerTs;
            this.expectedKey = expectedKey;
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            String key = value + "." + this.keyPostfix;
            ctx.getBroadcastState(STATE_DESCRIPTOR).put((Object)key, (Object)value);
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            Iterable broadcastStateIt = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries();
            Iterator iter = broadcastStateIt.iterator();
            for (int i = 0; i < this.expectedBroadcastState.size(); ++i) {
                Assertions.assertThat(iter).hasNext();
                Map.Entry entry = (Map.Entry)iter.next();
                Assertions.assertThat(this.expectedBroadcastState).containsEntry((Object)((String)entry.getKey()), (Object)((Integer)entry.getValue()));
            }
            Assertions.assertThat(iter).isExhausted();
            ctx.timerService().registerEventTimeTimer(this.timerTs);
        }

        public void onTimer(long timestamp, KeyedBroadcastProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Iterator iter = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries().iterator();
            ArrayList<Map.Entry<String, Integer>> map = new ArrayList<Map.Entry<String, Integer>>();
            while (iter.hasNext()) {
                map.add((Map.Entry)iter.next());
            }
            Assertions.assertThat((String)((String)ctx.getCurrentKey())).isEqualTo(this.expectedKey);
            String mapToStr = CoBroadcastWithKeyedOperatorTest.mapToString(map);
            out.collect((Object)("TS:" + timestamp + " " + mapToStr));
        }
    }

    private static class TestFunctionWithOutput
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final Set<String> keysToRegister;

        TestFunctionWithOutput(Set<String> keysToRegister) {
            this.keysToRegister = (Set)Preconditions.checkNotNull(keysToRegister);
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            for (String k : this.keysToRegister) {
                ctx.getBroadcastState(STATE_DESCRIPTOR).put((Object)k, (Object)value);
            }
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            for (Map.Entry entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
                out.collect((Object)entry.toString());
            }
        }
    }
}

