/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.operator;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.operator.CepOperator;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.time.TimerService;
import org.apache.flink.cep.utils.CepOperatorBuilder;
import org.apache.flink.cep.utils.CepOperatorTestUtilities;
import org.apache.flink.cep.utils.EventBuilder;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CEPOperatorTest
extends TestLogger {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @After
    public void validate() {
        Mockito.validateMockitoUsage();
    }

    @Test
    public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = this.getCepTestHarness(false);){
            harness.open();
            Watermark expectedWatermark = new Watermark(42L);
            harness.processWatermark(expectedWatermark);
            this.verifyWatermark(harness.getOutput().poll(), 42L);
        }
    }

    @Test
    public void testProcessingTimestampisPassedToNFA() throws Exception {
        NFA nfa = NFACompiler.compileFactory((Pattern)Pattern.begin((String)"begin"), (boolean)true).createNFA();
        NFA spyNFA = (NFA)Mockito.spy((Object)nfa);
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorBuilder.createOperatorForNFA((NFA<Event>)spyNFA).build());){
            long timestamp = 5L;
            harness.open();
            harness.setProcessingTime(timestamp);
            StreamRecord<Event> event = EventBuilder.event().withTimestamp(3L).asStreamRecord();
            harness.processElement(event);
            ((NFA)Mockito.verify((Object)spyNFA)).process((SharedBufferAccessor)ArgumentMatchers.any(SharedBufferAccessor.class), (NFAState)ArgumentMatchers.any(NFAState.class), (Object)((Event)ArgumentMatchers.eq((Object)((Event)event.getValue()))), ArgumentMatchers.eq((long)timestamp), (AfterMatchSkipStrategy)ArgumentMatchers.any(AfterMatchSkipStrategy.class), (TimerService)ArgumentMatchers.any(TimerService.class));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedCEPOperatorCheckpointing() throws Exception {
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = this.getCepTestHarness(false);){
            harness.open();
            Event startEvent = new Event(42, "start", 1.0);
            SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
            Event endEvent = new Event(42, "end", 1.0);
            harness.processElement(new StreamRecord((Object)startEvent, 1L));
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 2L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            harness = this.getCepTestHarness(false);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.processWatermark(new Watermark(Long.MIN_VALUE));
            harness.processElement(new StreamRecord((Object)new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
            harness.processWatermark(new Watermark(2L));
            harness.processElement(new StreamRecord((Object)middleEvent, 3L));
            harness.processElement(new StreamRecord((Object)new Event(42, "start", 1.0), 4L));
            harness.processElement(new StreamRecord((Object)endEvent, 5L));
            OperatorSubtaskState snapshot2 = harness.snapshot(1L, 1L);
            harness.close();
            harness = this.getCepTestHarness(false);
            harness.setup();
            harness.initializeState(snapshot2);
            harness.open();
            harness.processWatermark(new Watermark(Long.MAX_VALUE));
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)2L, (long)result.size());
            this.verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
            this.verifyWatermark(result.poll(), Long.MAX_VALUE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
        String rocksDbPath = this.tempFolder.newFolder().getAbsolutePath();
        EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(TernaryBoolean.FALSE);
        rocksDBStateBackend.setDbStoragePath(rocksDbPath);
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = this.getCepTestHarness(false);){
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage());
            harness.open();
            Event startEvent = new Event(42, "start", 1.0);
            SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
            Event endEvent = new Event(42, "end", 1.0);
            harness.processElement(new StreamRecord((Object)startEvent, 1L));
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 2L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            harness = this.getCepTestHarness(false);
            rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
            rocksDBStateBackend.setDbStoragePath(rocksDbPath);
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.processWatermark(new Watermark(Long.MIN_VALUE));
            harness.processElement(new StreamRecord((Object)new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
            harness.processWatermark(new Watermark(2L));
            OperatorSubtaskState snapshot2 = harness.snapshot(1L, 1L);
            harness.close();
            harness = this.getCepTestHarness(false);
            rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
            rocksDBStateBackend.setDbStoragePath(rocksDbPath);
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setup();
            harness.initializeState(snapshot2);
            harness.open();
            harness.processElement(new StreamRecord((Object)middleEvent, 3L));
            harness.processElement(new StreamRecord((Object)new Event(42, "start", 1.0), 4L));
            harness.processElement(new StreamRecord((Object)endEvent, 5L));
            harness.processWatermark(new Watermark(Long.MAX_VALUE));
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)2L, (long)result.size());
            this.verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
            this.verifyWatermark(result.poll(), Long.MAX_VALUE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedAdvancingTimeWithoutElements() throws Exception {
        Event startEvent = new Event(42, "start", 1.0);
        long watermarkTimestamp1 = 5L;
        long watermarkTimestamp2 = 13L;
        HashMap<String, List<Event>> expectedSequence = new HashMap<String, List<Event>>(2);
        expectedSequence.put("start", Collections.singletonList(startEvent));
        OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOut = new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timedOut"){};
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)new CepOperator(Event.createTypeSerializer(), false, (NFACompiler.NFAFactory)new NFAFactory(true), null, null, (PatternProcessFunction)new TimedOutProcessFunction(timedOut), null), (KeySelector)new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = 7219185117566268366L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        }, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            String rocksDbPath = this.tempFolder.newFolder().getAbsolutePath();
            EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
            rocksDBStateBackend.setDbStoragePath(rocksDbPath);
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage());
            harness.setup((TypeSerializer)new KryoSerializer(Map.class, (SerializerConfig)new SerializerConfigImpl()));
            harness.open();
            harness.processElement(new StreamRecord((Object)startEvent, 3L));
            harness.processWatermark(new Watermark(5L));
            harness.processWatermark(new Watermark(13L));
            ConcurrentLinkedQueue result = harness.getOutput();
            ConcurrentLinkedQueue sideOutput = harness.getSideOutput((OutputTag)timedOut);
            Assert.assertEquals((long)2L, (long)result.size());
            Assert.assertEquals((long)1L, (long)sideOutput.size());
            Object watermark1 = result.poll();
            Assert.assertTrue((boolean)(watermark1 instanceof Watermark));
            Assert.assertEquals((long)5L, (long)((Watermark)watermark1).getTimestamp());
            Tuple2 leftResult = (Tuple2)((StreamRecord)sideOutput.poll()).getValue();
            Assert.assertEquals((long)13L, (long)((Long)leftResult.f1));
            Assert.assertEquals(expectedSequence, (Object)leftResult.f0);
            Object watermark2 = result.poll();
            Assert.assertTrue((boolean)(watermark2 instanceof Watermark));
            Assert.assertEquals((long)13L, (long)((Watermark)watermark2).getTimestamp());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedCEPOperatorNFAUpdate() throws Exception {
        CepOperator operator = CepOperatorTestUtilities.getKeyedCepOperator(true, new SimpleNFAFactory());
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            Event startEvent = new Event(42, "c", 1.0);
            SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
            Event endEvent = new Event(42, "b", 1.0);
            harness.processElement(new StreamRecord((Object)startEvent, 1L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            operator = CepOperatorTestUtilities.getKeyedCepOperator(true, new SimpleNFAFactory());
            harness = CepOperatorTestUtilities.getCepTestHarness(operator);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.processElement(new StreamRecord((Object)new Event(42, "d", 1.0), 4L));
            OperatorSubtaskState snapshot2 = harness.snapshot(0L, 0L);
            harness.close();
            operator = CepOperatorTestUtilities.getKeyedCepOperator(true, new SimpleNFAFactory());
            harness = CepOperatorTestUtilities.getCepTestHarness(operator);
            harness.setup();
            harness.initializeState(snapshot2);
            harness.open();
            harness.processElement(new StreamRecord((Object)middleEvent, 4L));
            harness.processElement(new StreamRecord((Object)endEvent, 4L));
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)1L, (long)result.size());
            this.verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedCEPOperatorNFAUpdateWithRocksDB() throws Exception {
        String rocksDbPath = this.tempFolder.newFolder().getAbsolutePath();
        EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(TernaryBoolean.FALSE);
        rocksDBStateBackend.setDbStoragePath(rocksDbPath);
        CepOperator operator = CepOperatorTestUtilities.getKeyedCepOperator(true, new SimpleNFAFactory());
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage());
            harness.open();
            Event startEvent = new Event(42, "c", 1.0);
            SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
            Event endEvent = new Event(42, "b", 1.0);
            harness.processElement(new StreamRecord((Object)startEvent, 1L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            operator = CepOperatorTestUtilities.getKeyedCepOperator(true, new SimpleNFAFactory());
            harness = CepOperatorTestUtilities.getCepTestHarness(operator);
            rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
            rocksDBStateBackend.setDbStoragePath(rocksDbPath);
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.processElement(new StreamRecord((Object)new Event(42, "d", 1.0), 4L));
            OperatorSubtaskState snapshot2 = harness.snapshot(0L, 0L);
            harness.close();
            operator = CepOperatorTestUtilities.getKeyedCepOperator(true, new SimpleNFAFactory());
            harness = CepOperatorTestUtilities.getCepTestHarness(operator);
            rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
            rocksDBStateBackend.setDbStoragePath(rocksDbPath);
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setup();
            harness.initializeState(snapshot2);
            harness.open();
            harness.processElement(new StreamRecord((Object)middleEvent, 4L));
            harness.processElement(new StreamRecord((Object)endEvent, 4L));
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)1L, (long)result.size());
            this.verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedCEPOperatorNFAUpdateTimes() throws Exception {
        CepOperator operator = CepOperatorTestUtilities.getKeyedCepOperator(true, new SimpleNFAFactory());
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            ValueState nfaOperatorState = (ValueState)Whitebox.getInternalState(operator, (String)"computationStates");
            ValueState nfaOperatorStateSpy = (ValueState)Mockito.spy((Object)nfaOperatorState);
            Whitebox.setInternalState(operator, (String)"computationStates", (Object)nfaOperatorStateSpy);
            Event startEvent = new Event(42, "c", 1.0);
            SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
            Event endEvent = new Event(42, "b", 1.0);
            harness.processElement(new StreamRecord((Object)startEvent, 1L));
            harness.processElement(new StreamRecord((Object)new Event(42, "d", 1.0), 4L));
            harness.processElement(new StreamRecord((Object)middleEvent, 4L));
            harness.processElement(new StreamRecord((Object)endEvent, 4L));
            ((ValueState)Mockito.verify((Object)nfaOperatorStateSpy, (VerificationMode)Mockito.times((int)3))).update(Mockito.any());
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)1L, (long)result.size());
            this.verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedCEPOperatorNFAUpdateTimesWithRocksDB() throws Exception {
        String rocksDbPath = this.tempFolder.newFolder().getAbsolutePath();
        EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        rocksDBStateBackend.setDbStoragePath(rocksDbPath);
        CepOperator operator = CepOperatorTestUtilities.getKeyedCepOperator(true, new SimpleNFAFactory());
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage());
            harness.open();
            ValueState nfaOperatorState = (ValueState)Whitebox.getInternalState(operator, (String)"computationStates");
            ValueState nfaOperatorStateSpy = (ValueState)Mockito.spy((Object)nfaOperatorState);
            Whitebox.setInternalState(operator, (String)"computationStates", (Object)nfaOperatorStateSpy);
            Event startEvent = new Event(42, "c", 1.0);
            SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
            Event endEvent = new Event(42, "b", 1.0);
            harness.processElement(new StreamRecord((Object)startEvent, 1L));
            harness.processElement(new StreamRecord((Object)new Event(42, "d", 1.0), 4L));
            harness.processElement(new StreamRecord((Object)middleEvent, 4L));
            harness.processElement(new StreamRecord((Object)endEvent, 4L));
            ((ValueState)Mockito.verify((Object)nfaOperatorStateSpy, (VerificationMode)Mockito.times((int)3))).update(Mockito.any());
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)1L, (long)result.size());
            this.verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPOperatorCleanupEventTime() throws Exception {
        Event startEvent1 = new Event(42, "start", 1.0);
        Event startEvent2 = new Event(42, "start", 2.0);
        SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
        SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
        SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
        Event endEvent1 = new Event(42, "end", 1.0);
        Event endEvent2 = new Event(42, "end", 2.0);
        Event startEventK2 = new Event(43, "start", 1.0);
        CepOperator<Event, Integer, Map<String, List<Event>>> operator = this.getKeyedCepOperator(false);
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            harness.processWatermark(new Watermark(Long.MIN_VALUE));
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 2L));
            harness.processElement(new StreamRecord((Object)middleEvent1, 2L));
            harness.processElement(new StreamRecord((Object)new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
            harness.processElement(new StreamRecord((Object)startEvent1, 1L));
            harness.processElement(new StreamRecord((Object)startEventK2, 1L));
            Assert.assertEquals((long)4L, (long)harness.numEventTimeTimers());
            Assert.assertEquals((long)4L, (long)operator.getPQSize((Object)42));
            Assert.assertEquals((long)1L, (long)operator.getPQSize((Object)43));
            Assert.assertTrue((!operator.hasNonEmptySharedBuffer((Object)42) ? 1 : 0) != 0);
            Assert.assertTrue((!operator.hasNonEmptySharedBuffer((Object)43) ? 1 : 0) != 0);
            harness.processWatermark(new Watermark(2L));
            this.verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
            this.verifyWatermark(harness.getOutput().poll(), 2L);
            Assert.assertEquals((long)4L, (long)harness.numEventTimeTimers());
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)42));
            Assert.assertEquals((long)1L, (long)operator.getPQSize((Object)42));
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)43));
            Assert.assertTrue((!operator.hasNonEmptyPQ((Object)43) ? 1 : 0) != 0);
            harness.processElement(new StreamRecord((Object)startEvent2, 4L));
            harness.processElement(new StreamRecord((Object)middleEvent2, 5L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = this.getKeyedCepOperator(false);
            harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.processElement(new StreamRecord((Object)endEvent1, 6L));
            harness.processWatermark(11L);
            harness.processWatermark(12L);
            Assert.assertEquals((long)3L, (long)harness.numEventTimeTimers());
            Assert.assertTrue((boolean)operator2.hasNonEmptySharedBuffer((Object)42));
            Assert.assertTrue((!operator2.hasNonEmptyPQ((Object)42) ? 1 : 0) != 0);
            Assert.assertTrue((!operator2.hasNonEmptySharedBuffer((Object)43) ? 1 : 0) != 0);
            Assert.assertTrue((!operator2.hasNonEmptyPQ((Object)43) ? 1 : 0) != 0);
            this.verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
            this.verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
            this.verifyWatermark(harness.getOutput().poll(), 11L);
            this.verifyWatermark(harness.getOutput().poll(), 12L);
            harness.processElement(new StreamRecord((Object)middleEvent3, 12L));
            harness.processElement(new StreamRecord((Object)endEvent2, 13L));
            harness.processWatermark(20L);
            harness.processWatermark(21L);
            Assert.assertTrue((!operator2.hasNonEmptySharedBuffer((Object)42) ? 1 : 0) != 0);
            Assert.assertTrue((!operator2.hasNonEmptyPQ((Object)42) ? 1 : 0) != 0);
            Assert.assertEquals((long)0L, (long)harness.numEventTimeTimers());
            Assert.assertEquals((long)3L, (long)harness.getOutput().size());
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
            this.verifyWatermark(harness.getOutput().poll(), 20L);
            this.verifyWatermark(harness.getOutput().poll(), 21L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPOperatorCleanupEventTimeWithSameElements() throws Exception {
        Event startEvent = new Event(41, "c", 1.0);
        Event middle1Event1 = new Event(41, "a", 2.0);
        Event middle1Event2 = new Event(41, "a", 3.0);
        Event middle1Event3 = new Event(41, "a", 4.0);
        Event middle2Event1 = new Event(41, "b", 5.0);
        CepOperator operator = CepOperatorTestUtilities.getKeyedCepOperator(false, new ComplexNFAFactory());
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            harness.processWatermark(new Watermark(Long.MIN_VALUE));
            harness.processElement(new StreamRecord((Object)middle2Event1, 6L));
            harness.processElement(new StreamRecord((Object)middle1Event3, 7L));
            harness.processElement(new StreamRecord((Object)startEvent, 1L));
            harness.processElement(new StreamRecord((Object)middle1Event1, 3L));
            harness.processElement(new StreamRecord((Object)middle1Event2, 3L));
            harness.processElement(new StreamRecord((Object)middle1Event1, 3L));
            harness.processElement(new StreamRecord((Object)new Event(41, "d", 6.0), 5L));
            Assert.assertEquals((long)5L, (long)harness.numEventTimeTimers());
            Assert.assertEquals((long)7L, (long)operator.getPQSize((Object)41));
            Assert.assertTrue((!operator.hasNonEmptySharedBuffer((Object)41) ? 1 : 0) != 0);
            harness.processWatermark(new Watermark(2L));
            this.verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
            this.verifyWatermark(harness.getOutput().poll(), 2L);
            Assert.assertEquals((long)5L, (long)harness.numEventTimeTimers());
            Assert.assertEquals((long)6L, (long)operator.getPQSize((Object)41));
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)41));
            harness.processWatermark(new Watermark(8L));
            ArrayList<List<Event>> resultingPatterns = new ArrayList<List<Event>>();
            while (!harness.getOutput().isEmpty()) {
                Object o = harness.getOutput().poll();
                if (!(o instanceof Watermark)) {
                    StreamRecord el = (StreamRecord)o;
                    ArrayList res = new ArrayList();
                    for (List le : ((Map)el.getValue()).values()) {
                        res.addAll(le);
                    }
                    resultingPatterns.add(res);
                    continue;
                }
                this.verifyWatermark(o, 8L);
            }
            this.compareMaps(resultingPatterns, Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new Event[]{startEvent, middle1Event1}), Lists.newArrayList((Object[])new Event[]{startEvent, middle1Event1, middle1Event2}), Lists.newArrayList((Object[])new Event[]{startEvent, middle2Event1, middle1Event3}), Lists.newArrayList((Object[])new Event[]{startEvent, middle1Event1, middle1Event2, middle1Event1}), Lists.newArrayList((Object[])new Event[]{startEvent, middle1Event1, middle2Event1, middle1Event3}), Lists.newArrayList((Object[])new Event[]{startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3}), Lists.newArrayList((Object[])new Event[]{startEvent, middle1Event1, middle1Event2, middle2Event1, middle1Event3}), Lists.newArrayList((Object[])new Event[]{startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3})}));
            Assert.assertEquals((long)1L, (long)harness.numEventTimeTimers());
            Assert.assertEquals((long)0L, (long)operator.getPQSize((Object)41));
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)41));
            harness.processWatermark(new Watermark(17L));
            this.verifyWatermark(harness.getOutput().poll(), 17L);
            Assert.assertTrue((!operator.hasNonEmptySharedBuffer((Object)41) ? 1 : 0) != 0);
            Assert.assertTrue((!operator.hasNonEmptyPQ((Object)41) ? 1 : 0) != 0);
            Assert.assertEquals((long)0L, (long)harness.numEventTimeTimers());
        }
    }

    @Test
    public void testCEPOperatorSideOutputLateElementsEventTime() throws Exception {
        Event startEvent = new Event(41, "c", 1.0);
        Event middle1Event1 = new Event(41, "a", 2.0);
        Event middle1Event2 = new Event(41, "a", 3.0);
        Event middle1Event3 = new Event(41, "a", 4.0);
        OutputTag lateDataTag = new OutputTag("late-data", TypeInformation.of(Event.class));
        CepOperator operator = CepOperatorTestUtilities.getKeyedCepOperator(false, new ComplexNFAFactory(), null, (OutputTag<Event>)lateDataTag);
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            harness.processWatermark(new Watermark(Long.MIN_VALUE));
            harness.processElement(new StreamRecord((Object)startEvent, 6L));
            this.verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
            harness.processWatermark(new Watermark(6L));
            this.verifyWatermark(harness.getOutput().poll(), 6L);
            harness.processElement(new StreamRecord((Object)middle1Event1, 4L));
            harness.processElement(new StreamRecord((Object)middle1Event2, 5L));
            harness.processElement(new StreamRecord((Object)middle1Event3, 7L));
            ArrayList<Event> late = new ArrayList<Event>();
            while (!harness.getSideOutput(lateDataTag).isEmpty()) {
                StreamRecord eventStreamRecord = (StreamRecord)harness.getSideOutput(lateDataTag).poll();
                late.add((Event)eventStreamRecord.getValue());
            }
            ArrayList expected = Lists.newArrayList((Object[])new Event[]{middle1Event1, middle1Event2});
            Assert.assertArrayEquals((Object[])expected.toArray(), (Object[])late.toArray());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPOperatorLateRecordsMetric() throws Exception {
        Event startEvent = new Event(41, "c", 1.0);
        Event middle1Event1 = new Event(41, "a", 2.0);
        Event middle1Event2 = new Event(41, "a", 3.0);
        Event middle1Event3 = new Event(41, "a", 4.0);
        CepOperator<Event, Integer, Map<String, List<Event>>> operator = this.getKeyedCepOperator(false);
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            harness.processWatermark(0L);
            harness.processElement((Object)startEvent, 1L);
            harness.processWatermark(2L);
            harness.processElement((Object)middle1Event1, 1L);
            harness.processElement((Object)middle1Event2, 3L);
            harness.processWatermark(4L);
            harness.processElement((Object)middle1Event3, 3L);
            Assert.assertEquals((long)2L, (long)operator.getLateRecordsNumber());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPOperatorCleanupProcessingTime() throws Exception {
        Event startEvent1 = new Event(42, "start", 1.0);
        Event startEvent2 = new Event(42, "start", 2.0);
        SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
        SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
        SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
        Event endEvent1 = new Event(42, "end", 1.0);
        Event endEvent2 = new Event(42, "end", 2.0);
        Event startEventK2 = new Event(43, "start", 1.0);
        CepOperator<Event, Integer, Map<String, List<Event>>> operator = this.getKeyedCepOperator(true);
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            harness.setProcessingTime(0L);
            harness.processElement(new StreamRecord((Object)startEvent1, 1L));
            harness.processElement(new StreamRecord((Object)startEventK2, 1L));
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 2L));
            harness.processElement(new StreamRecord((Object)middleEvent1, 2L));
            harness.processElement(new StreamRecord((Object)new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
            Assert.assertTrue((!operator.hasNonEmptyPQ((Object)42) ? 1 : 0) != 0);
            Assert.assertTrue((!operator.hasNonEmptyPQ((Object)43) ? 1 : 0) != 0);
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)42));
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)43));
            harness.setProcessingTime(3L);
            harness.processElement(new StreamRecord((Object)startEvent2, 3L));
            harness.processElement(new StreamRecord((Object)middleEvent2, 4L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = this.getKeyedCepOperator(true);
            harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.setProcessingTime(3L);
            harness.processElement(new StreamRecord((Object)endEvent1, 5L));
            this.verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
            this.verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
            harness.setProcessingTime(11L);
            harness.processElement(new StreamRecord((Object)middleEvent3, 11L));
            harness.processElement(new StreamRecord((Object)endEvent2, 12L));
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
            harness.setProcessingTime(21L);
            Assert.assertTrue((!operator2.hasNonEmptySharedBuffer((Object)42) ? 1 : 0) != 0);
            harness.processElement(new StreamRecord((Object)startEvent1, 21L));
            Assert.assertTrue((boolean)operator2.hasNonEmptySharedBuffer((Object)42));
            harness.setProcessingTime(49L);
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 2L));
            Assert.assertTrue((!operator2.hasNonEmptySharedBuffer((Object)42) ? 1 : 0) != 0);
            Assert.assertEquals((long)0L, (long)harness.numEventTimeTimers());
            Assert.assertTrue((!operator2.hasNonEmptyPQ((Object)42) ? 1 : 0) != 0);
            Assert.assertTrue((!operator2.hasNonEmptyPQ((Object)43) ? 1 : 0) != 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPOperatorSerializationWRocksDB() throws Exception {
        String rocksDbPath = this.tempFolder.newFolder().getAbsolutePath();
        EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        rocksDBStateBackend.setDbStoragePath(rocksDbPath);
        Event startEvent1 = new Event(40, "start", 1.0);
        Event startEvent2 = new Event(40, "start", 2.0);
        SubEvent middleEvent1 = new SubEvent(40, "foo1", 1.0, 10.0);
        SubEvent middleEvent2 = new SubEvent(40, "foo2", 2.0, 10.0);
        SubEvent middleEvent3 = new SubEvent(40, "foo3", 3.0, 10.0);
        SubEvent middleEvent4 = new SubEvent(40, "foo4", 1.0, 10.0);
        Event nextOne = new Event(40, "next-one", 1.0);
        Event endEvent = new Event(40, "end", 1.0);
        final Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedBy("middle").subtype(SubEvent.class).where((IterativeCondition)new IterativeCondition<SubEvent>(){
            private static final long serialVersionUID = 6215754202506583964L;

            public boolean filter(SubEvent value, IterativeCondition.Context<SubEvent> ctx) throws Exception {
                if (!value.getName().startsWith("foo")) {
                    return false;
                }
                double sum = 0.0;
                for (Event event : ctx.getEventsForPattern("middle")) {
                    sum += event.getPrice();
                }
                return Double.compare(sum += value.getPrice(), 5.0) < 0;
            }
        }).oneOrMore().allowCombinations().followedBy("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        CepOperator operator = CepOperatorTestUtilities.getKeyedCepOperator(false, new NFACompiler.NFAFactory<Event>(){
            private static final long serialVersionUID = 477082663248051994L;

            public NFA<Event> createNFA() {
                return NFACompiler.compileFactory((Pattern)pattern, (boolean)false).createNFA();
            }
        });
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.setStateBackend((StateBackend)rocksDBStateBackend);
            harness.setCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage());
            harness.open();
            harness.processWatermark(0L);
            harness.processElement(new StreamRecord((Object)startEvent1, 1L));
            harness.processElement(new StreamRecord((Object)middleEvent1, 2L));
            harness.processWatermark(2L);
            harness.processElement(new StreamRecord((Object)middleEvent3, 5L));
            harness.processElement(new StreamRecord((Object)middleEvent2, 3L));
            harness.processElement(new StreamRecord((Object)startEvent2, 4L));
            harness.processWatermark(5L);
            harness.processElement(new StreamRecord((Object)nextOne, 7L));
            harness.processElement(new StreamRecord((Object)endEvent, 8L));
            harness.processElement(new StreamRecord((Object)middleEvent4, 6L));
            harness.processWatermark(100L);
            ArrayList<List<Event>> resultingPatterns = new ArrayList<List<Event>>();
            while (!harness.getOutput().isEmpty()) {
                Object o = harness.getOutput().poll();
                if (o instanceof Watermark) continue;
                StreamRecord el = (StreamRecord)o;
                ArrayList res = new ArrayList();
                for (List le : ((Map)el.getValue()).values()) {
                    res.addAll(le);
                }
                resultingPatterns.add(res);
            }
            this.compareMaps(resultingPatterns, Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new Event[]{startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4}), Lists.newArrayList((Object[])new Event[]{startEvent1, endEvent, middleEvent2, middleEvent1}), Lists.newArrayList((Object[])new Event[]{startEvent1, endEvent, middleEvent3, middleEvent1}), Lists.newArrayList((Object[])new Event[]{startEvent2, endEvent, middleEvent3, middleEvent4}), Lists.newArrayList((Object[])new Event[]{startEvent1, endEvent, middleEvent4, middleEvent1}), Lists.newArrayList((Object[])new Event[]{startEvent1, endEvent, middleEvent1}), Lists.newArrayList((Object[])new Event[]{startEvent2, endEvent, middleEvent3})}));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPOperatorComparatorProcessTime() throws Exception {
        Event startEvent1 = new Event(42, "start", 1.0);
        Event startEvent2 = new Event(42, "start", 2.0);
        SubEvent middleEvent1 = new SubEvent(42, "foo1", 3.0, 10.0);
        SubEvent middleEvent2 = new SubEvent(42, "foo2", 4.0, 10.0);
        Event endEvent1 = new Event(42, "end", 1.0);
        Event startEventK2 = new Event(43, "start", 1.0);
        CepOperator<Event, Integer, Map<String, List<Event>>> operator = this.getKeyedCepOperatorWithComparator(true);
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            harness.setProcessingTime(0L);
            harness.processElement(new StreamRecord((Object)startEvent1, 0L));
            harness.processElement(new StreamRecord((Object)startEventK2, 0L));
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 0L));
            harness.processElement(new StreamRecord((Object)new SubEvent(42, "barfoo", 1.0, 5.0), 0L));
            Assert.assertTrue((!operator.hasNonEmptySharedBuffer((Object)42) ? 1 : 0) != 0);
            Assert.assertTrue((!operator.hasNonEmptySharedBuffer((Object)43) ? 1 : 0) != 0);
            harness.setProcessingTime(3L);
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)42));
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)43));
            harness.processElement(new StreamRecord((Object)middleEvent2, 3L));
            harness.processElement(new StreamRecord((Object)middleEvent1, 3L));
            harness.processElement(new StreamRecord((Object)startEvent2, 3L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = this.getKeyedCepOperatorWithComparator(true);
            harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.setProcessingTime(4L);
            harness.processElement(new StreamRecord((Object)endEvent1, 5L));
            harness.setProcessingTime(5L);
            this.verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
            this.verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent1, endEvent1);
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPOperatorComparatorEventTime() throws Exception {
        Event startEvent1 = new Event(42, "start", 1.0);
        Event startEvent2 = new Event(42, "start", 2.0);
        SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
        SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
        Event endEvent = new Event(42, "end", 1.0);
        Event startEventK2 = new Event(43, "start", 1.0);
        CepOperator<Event, Integer, Map<String, List<Event>>> operator = this.getKeyedCepOperatorWithComparator(false);
        try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);){
            harness.open();
            harness.processWatermark(0L);
            harness.processElement(new StreamRecord((Object)startEvent1, 1L));
            harness.processElement(new StreamRecord((Object)startEventK2, 1L));
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 2L));
            harness.processElement(new StreamRecord((Object)new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
            Assert.assertTrue((boolean)operator.hasNonEmptyPQ((Object)42));
            Assert.assertTrue((boolean)operator.hasNonEmptyPQ((Object)43));
            Assert.assertFalse((boolean)operator.hasNonEmptySharedBuffer((Object)42));
            Assert.assertFalse((boolean)operator.hasNonEmptySharedBuffer((Object)43));
            harness.processWatermark(3L);
            Assert.assertFalse((boolean)operator.hasNonEmptyPQ((Object)42));
            Assert.assertFalse((boolean)operator.hasNonEmptyPQ((Object)43));
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)42));
            Assert.assertTrue((boolean)operator.hasNonEmptySharedBuffer((Object)43));
            harness.processElement(new StreamRecord((Object)startEvent2, 4L));
            harness.processElement(new StreamRecord((Object)middleEvent2, 5L));
            harness.processElement(new StreamRecord((Object)middleEvent1, 5L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = this.getKeyedCepOperatorWithComparator(false);
            harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.processElement(new StreamRecord((Object)endEvent, 6L));
            harness.processWatermark(6L);
            this.verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent);
            this.verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent);
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent1, endEvent);
            this.verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent);
            this.verifyWatermark(harness.getOutput().poll(), 6L);
        }
    }

    private void verifyWatermark(Object outputObject, long timestamp) {
        Assert.assertTrue((boolean)(outputObject instanceof Watermark));
        Assert.assertEquals((long)timestamp, (long)((Watermark)outputObject).getTimestamp());
    }

    private void verifyPattern(Object outputObject, Event start, SubEvent middle, Event end) {
        Assert.assertTrue((boolean)(outputObject instanceof StreamRecord));
        StreamRecord resultRecord = (StreamRecord)outputObject;
        Assert.assertTrue((boolean)(resultRecord.getValue() instanceof Map));
        Map patternMap = (Map)resultRecord.getValue();
        Assert.assertEquals((Object)start, ((List)patternMap.get("start")).get(0));
        Assert.assertEquals((Object)middle, ((List)patternMap.get("middle")).get(0));
        Assert.assertEquals((Object)end, ((List)patternMap.get("end")).get(0));
    }

    private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperator(boolean isProcessingTime) {
        return CepOperatorTestUtilities.getKeyedCepOperator(isProcessingTime, new NFAFactory());
    }

    private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperatorWithComparator(boolean isProcessingTime) {
        return CepOperatorTestUtilities.getKeyedCepOperator(isProcessingTime, new NFAFactory(), new org.apache.flink.cep.EventComparator<Event>(){

            public int compare(Event o1, Event o2) {
                return Double.compare(o1.getPrice(), o2.getPrice());
            }
        });
    }

    private void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
        Assert.assertEquals((long)expected.size(), (long)actual.size());
        for (List<Event> p : actual) {
            Collections.sort(p, new EventComparator());
        }
        for (List<Event> p : expected) {
            Collections.sort(p, new EventComparator());
        }
        Collections.sort(actual, new ListEventComparator());
        Collections.sort(expected, new ListEventComparator());
        Assert.assertArrayEquals((Object[])expected.toArray(), (Object[])actual.toArray());
    }

    private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(boolean isProcessingTime) throws Exception {
        return CepOperatorTestUtilities.getCepTestHarness(this.getKeyedCepOpearator(isProcessingTime));
    }

    private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator(boolean isProcessingTime) {
        return CepOperatorTestUtilities.getKeyedCepOperator(isProcessingTime, new NFAFactory());
    }

    private static class NFAFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAFactory() {
            this(false);
        }

        private NFAFactory(boolean handleTimeout) {
            this.handleTimeout = handleTimeout;
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").subtype(SubEvent.class).where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getVolume() > 5.0)).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end"))).within(Duration.ofMillis(10L));
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)this.handleTimeout).createNFA();
        }
    }

    private static class TimedOutProcessFunction
    extends PatternProcessFunction<Event, Map<String, List<Event>>>
    implements TimedOutPartialMatchHandler<Event> {
        private final OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOutTag;

        private TimedOutProcessFunction(OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOutTag) {
            this.timedOutTag = timedOutTag;
        }

        public void processMatch(Map<String, List<Event>> match, PatternProcessFunction.Context ctx, Collector<Map<String, List<Event>>> out) throws Exception {
            out.collect(match);
        }

        public void processTimedOutMatch(Map<String, List<Event>> match, PatternProcessFunction.Context ctx) throws Exception {
            ctx.output(this.timedOutTag, (Object)Tuple2.of(match, (Object)ctx.timestamp()));
        }
    }

    private static class SimpleNFAFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private SimpleNFAFactory() {
            this(false);
        }

        private SimpleNFAFactory(boolean handleTimeout) {
            this.handleTimeout = handleTimeout;
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("c"))).followedBy("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("a"))).followedBy("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("b"))).within(Duration.ofMillis(10L));
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)this.handleTimeout).createNFA();
        }
    }

    private static class ComplexNFAFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private ComplexNFAFactory() {
            this(false);
        }

        private ComplexNFAFactory(boolean handleTimeout) {
            this.handleTimeout = handleTimeout;
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("c"))).followedBy("middle1").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("a"))).oneOrMore().optional().followedBy("middle2").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("b"))).optional().followedBy("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("a"))).within(Duration.ofMillis(10L));
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)this.handleTimeout).createNFA();
        }
    }

    private class EventComparator
    implements Comparator<Event> {
        private EventComparator() {
        }

        @Override
        public int compare(Event o1, Event o2) {
            int nameComp = o1.getName().compareTo(o2.getName());
            int priceComp = Double.compare(o1.getPrice(), o2.getPrice());
            int idComp = Integer.compare(o1.getId(), o2.getId());
            if (nameComp == 0) {
                if (priceComp == 0) {
                    return idComp;
                }
                return priceComp;
            }
            return nameComp;
        }
    }

    private class ListEventComparator
    implements Comparator<List<Event>> {
        private ListEventComparator() {
        }

        @Override
        public int compare(List<Event> o1, List<Event> o2) {
            int sizeComp = Integer.compare(o1.size(), o2.size());
            if (sizeComp == 0) {
                EventComparator comp = new EventComparator();
                for (int i = 0; i < o1.size(); ++i) {
                    int eventComp = comp.compare(o1.get(i), o2.get(i));
                    if (eventComp == 0) continue;
                    return eventComp;
                }
                return 0;
            }
            return sizeComp;
        }
    }
}

