/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.nfa;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.time.TimerService;
import org.apache.flink.cep.utils.NFAUtils;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.cep.utils.TestTimerService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NFAStatusChangeITCase {
    private SharedBuffer<Event> sharedBuffer;
    private SharedBufferAccessor<Event> sharedBufferAccessor;
    private AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
    private TimerService timerService = new TestTimerService();

    @Before
    public void init() {
        this.sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
        this.sharedBufferAccessor = this.sharedBuffer.getAccessor();
    }

    @After
    public void clear() throws Exception {
        this.sharedBufferAccessor.close();
    }

    @Test
    public void testNFAChange() throws Exception {
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("a"))).followedByAny("middle").where((IterativeCondition)new IterativeCondition<Event>(){
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                return value.getName().equals("b");
            }
        }).oneOrMore().optional().allowCombinations().followedBy("middle2").where((IterativeCondition)new IterativeCondition<Event>(){
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                return value.getName().equals("d");
            }
        }).followedBy("end").where((IterativeCondition)new IterativeCondition<Event>(){
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                return value.getName().equals("e");
            }
        }).within(Duration.ofMillis(10L));
        NFA nfa = NFAUtils.compile(pattern, true);
        NFAState nfaState = nfa.createInitialNFAState();
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(1, "b", 1.0), 1L, this.skipStrategy, this.timerService);
        Assert.assertFalse((String)"NFA status should not change as the event does not match the take condition of the 'start' state", (boolean)nfaState.isStateChanged());
        nfaState.resetStateChanged();
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(2, "a", 1.0), 2L, this.skipStrategy, this.timerService);
        Assert.assertTrue((String)"NFA status should change as the event matches the take condition of the 'start' state", (boolean)nfaState.isStateChanged());
        nfaState.resetStateChanged();
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(3, "f", 1.0), 3L, this.skipStrategy, this.timerService);
        Assert.assertTrue((String)"NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", (boolean)nfaState.isStateChanged());
        nfaState.resetStateChanged();
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(4, "f", 1.0), 4L, this.skipStrategy, this.timerService);
        Assert.assertFalse((String)"NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", (boolean)nfaState.isStateChanged());
        nfaState.resetStateChanged();
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(5, "b", 1.0), 5L, this.skipStrategy, this.timerService);
        Assert.assertTrue((String)"NFA status should change as the event matches the take condition of 'middle:2' state", (boolean)nfaState.isStateChanged());
        nfaState.resetStateChanged();
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(6, "d", 1.0), 6L, this.skipStrategy, this.timerService);
        Assert.assertTrue((String)"NFA status should change as the event matches the take condition of 'middle2' state", (boolean)nfaState.isStateChanged());
        nfaState.resetStateChanged();
        nfa.advanceTime(this.sharedBufferAccessor, nfaState, 8L, this.skipStrategy);
        Assert.assertFalse((String)"NFA status should not change as the timestamp is within the window", (boolean)nfaState.isStateChanged());
        nfaState.resetStateChanged();
        Collection timeoutResults = (Collection)nfa.advanceTime(this.sharedBufferAccessor, (NFAState)nfaState, (long)12L, (AfterMatchSkipStrategy)this.skipStrategy).f1;
        Assert.assertTrue((String)"NFA status should change as timeout happens", (nfaState.isStateChanged() && !timeoutResults.isEmpty() ? 1 : 0) != 0);
    }

    @Test
    public void testNFAChangedOnOneNewComputationState() throws Exception {
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedBy("a*").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("a"))).oneOrMore().optional().next("end").where((IterativeCondition)new IterativeCondition<Event>(){
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                return value.getName().equals("b");
            }
        }).within(Duration.ofMillis(10L));
        NFA nfa = NFAUtils.compile(pattern, true);
        NFAState nfaState = nfa.createInitialNFAState();
        nfaState.resetStateChanged();
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(6, "start", 1.0), 6L, this.skipStrategy, this.timerService);
        nfaState.resetStateChanged();
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(6, "a", 1.0), 7L, this.skipStrategy, this.timerService);
        Assert.assertTrue((boolean)nfaState.isStateChanged());
    }

    @Test
    public void testNFAChangedOnTimeoutWithoutPrune() throws Exception {
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)new IterativeCondition<Event>(){

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                return value.getName().equals("start");
            }
        }).followedBy("end").where((IterativeCondition)new IterativeCondition<Event>(){
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                return value.getName().equals("end");
            }
        }).within(Duration.ofMillis(10L));
        NFA nfa = NFAUtils.compile(pattern, true);
        NFAState nfaState = nfa.createInitialNFAState();
        nfaState.resetStateChanged();
        nfa.advanceTime(this.sharedBufferAccessor, nfaState, 6L, this.skipStrategy);
        nfa.process(this.sharedBufferAccessor, nfaState, (Object)new Event(6, "start", 1.0), 6L, this.skipStrategy, this.timerService);
        nfaState.resetStateChanged();
        nfa.advanceTime(this.sharedBufferAccessor, nfaState, 17L, this.skipStrategy);
        Assert.assertTrue((boolean)nfaState.isStateChanged());
    }
}

