/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.input.operator;

import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.state.api.functions.WindowReaderFunction;
import org.apache.flink.state.api.input.operator.StateReaderOperator;
import org.apache.flink.state.api.input.operator.window.WindowContents;
import org.apache.flink.state.api.runtime.SavepointRuntimeContext;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

@Internal
public class WindowReaderOperator<S extends State, KEY, IN, W extends Window, OUT>
extends StateReaderOperator<WindowReaderFunction<IN, OUT, KEY, W>, KEY, W, OUT> {
    private static final String WINDOW_STATE_NAME = "window-contents";
    private static final String WINDOW_TIMER_NAME = "window-timers";
    private final WindowContents<S, IN> contents;
    private final StateDescriptor<S, ?> descriptor;
    private transient Context ctx;

    public static <KEY, T, W extends Window, OUT> WindowReaderOperator<?, KEY, T, W, OUT> reduce(ReduceFunction<T> function, WindowReaderFunction<T, OUT, KEY, W> reader, TypeInformation<KEY> keyType, TypeSerializer<W> windowSerializer, TypeInformation<T> inputType) {
        ReducingStateDescriptor descriptor = new ReducingStateDescriptor(WINDOW_STATE_NAME, function, inputType);
        return new WindowReaderOperator(reader, keyType, windowSerializer, WindowContents.reducingState(), descriptor);
    }

    public static <KEY, T, ACC, R, OUT, W extends Window> WindowReaderOperator<?, KEY, R, W, OUT> aggregate(AggregateFunction<T, ACC, R> function, WindowReaderFunction<R, OUT, KEY, W> readerFunction, TypeInformation<KEY> keyType, TypeSerializer<W> windowSerializer, TypeInformation<ACC> accumulatorType) {
        AggregatingStateDescriptor descriptor = new AggregatingStateDescriptor(WINDOW_STATE_NAME, function, accumulatorType);
        return new WindowReaderOperator(readerFunction, keyType, windowSerializer, WindowContents.aggregatingState(), descriptor);
    }

    public static <KEY, T, W extends Window, OUT> WindowReaderOperator<?, KEY, T, W, OUT> process(WindowReaderFunction<T, OUT, KEY, W> readerFunction, TypeInformation<KEY> keyType, TypeSerializer<W> windowSerializer, TypeInformation<T> stateType) {
        ListStateDescriptor descriptor = new ListStateDescriptor(WINDOW_STATE_NAME, stateType);
        return new WindowReaderOperator(readerFunction, keyType, windowSerializer, WindowContents.listState(), descriptor);
    }

    public static <KEY, T, W extends Window, OUT> WindowReaderOperator<?, KEY, StreamRecord<T>, W, OUT> evictingWindow(WindowReaderFunction<StreamRecord<T>, OUT, KEY, W> readerFunction, TypeInformation<KEY> keyType, TypeSerializer<W> windowSerializer, TypeInformation<T> stateType, ExecutionConfig config) {
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(stateType.createSerializer(config.getSerializerConfig()));
        ListStateDescriptor descriptor = new ListStateDescriptor(WINDOW_STATE_NAME, (TypeSerializer)streamRecordSerializer);
        return new WindowReaderOperator(readerFunction, keyType, windowSerializer, WindowContents.listState(), descriptor);
    }

    private WindowReaderOperator(WindowReaderFunction<IN, OUT, KEY, W> function, TypeInformation<KEY> keyType, TypeSerializer<W> namespaceSerializer, WindowContents<S, IN> contents, StateDescriptor<S, ?> descriptor) {
        super(function, keyType, namespaceSerializer);
        Preconditions.checkNotNull(contents, (String)"WindowContents must not be null");
        Preconditions.checkNotNull(descriptor, (String)"The state descriptor must not be null");
        this.contents = contents;
        this.descriptor = descriptor;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.ctx = new Context(this.getKeyedStateBackend(), this.getInternalTimerService(WINDOW_TIMER_NAME));
    }

    @Override
    public void processElement(KEY key, W namespace, Collector<OUT> out) throws Exception {
        this.ctx.window = namespace;
        State state = this.getKeyedStateBackend().getPartitionedState(namespace, this.namespaceSerializer, this.descriptor);
        ((WindowReaderFunction)this.function).readWindow(key, this.ctx, this.contents.contents(state), out);
    }

    @Override
    public CloseableIterator<Tuple2<KEY, W>> getKeysAndNamespaces(SavepointRuntimeContext ctx) throws Exception {
        Stream keysAndWindows = this.getKeyedStateBackend().getKeysAndNamespaces(this.descriptor.getName());
        return new IteratorWithRemove<Tuple2<KEY, W>>(keysAndWindows);
    }

    private class Context
    implements WindowReaderFunction.Context<W> {
        private static final String EVENT_TIMER_STATE = "event-time-timers";
        private static final String PROC_TIMER_STATE = "proc-time-timers";
        W window;
        final PerWindowKeyedStateStore perWindowKeyedStateStore;
        final DefaultKeyedStateStore keyedStateStore;
        ListState<Long> eventTimers;
        ListState<Long> procTimers;

        private Context(KeyedStateBackend<KEY> keyedStateBackend, InternalTimerService<W> timerService) throws Exception {
            this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, WindowReaderOperator.this.getSerializerFactory());
            this.perWindowKeyedStateStore = new PerWindowKeyedStateStore(keyedStateBackend);
            this.eventTimers = (ListState)keyedStateBackend.getPartitionedState((Object)WindowReaderOperator.WINDOW_TIMER_NAME, (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ListStateDescriptor(EVENT_TIMER_STATE, Types.LONG));
            timerService.forEachEventTimeTimer((namespace, timer) -> this.eventTimers.add(timer));
            this.procTimers = (ListState)keyedStateBackend.getPartitionedState((Object)WindowReaderOperator.WINDOW_TIMER_NAME, (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ListStateDescriptor(PROC_TIMER_STATE, Types.LONG));
            timerService.forEachProcessingTimeTimer((namespace, timer) -> this.procTimers.add(timer));
        }

        @Override
        public W window() {
            return this.window;
        }

        @Override
        public <TS extends State> TS triggerState(StateDescriptor<TS, ?> descriptor) {
            try {
                return (TS)WindowReaderOperator.this.getKeyedStateBackend().getPartitionedState(this.window, WindowReaderOperator.this.namespaceSerializer, descriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve trigger state", e);
            }
        }

        @Override
        public KeyedStateStore windowState() {
            this.perWindowKeyedStateStore.window = this.window;
            return this.perWindowKeyedStateStore;
        }

        @Override
        public KeyedStateStore globalState() {
            return this.keyedStateStore;
        }

        @Override
        public Set<Long> registeredEventTimeTimers() throws Exception {
            Iterable timers = (Iterable)this.eventTimers.get();
            if (timers == null) {
                return Collections.emptySet();
            }
            return StreamSupport.stream(timers.spliterator(), false).collect(Collectors.toSet());
        }

        @Override
        public Set<Long> registeredProcessingTimeTimers() throws Exception {
            Iterable timers = (Iterable)this.procTimers.get();
            if (timers == null) {
                return Collections.emptySet();
            }
            return StreamSupport.stream(timers.spliterator(), false).collect(Collectors.toSet());
        }
    }

    private static class IteratorWithRemove<T>
    implements CloseableIterator<T> {
        private final Iterator<T> iterator;
        private final AutoCloseable resource;

        private IteratorWithRemove(Stream<T> stream) {
            this.iterator = stream.iterator();
            this.resource = stream;
        }

        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        public T next() {
            return this.iterator.next();
        }

        public void remove() {
        }

        public void close() throws Exception {
            this.resource.close();
        }
    }

    private class PerWindowKeyedStateStore
    extends DefaultKeyedStateStore {
        W window;

        PerWindowKeyedStateStore(KeyedStateBackend<?> keyedStateBackend) {
            super(keyedStateBackend, WindowReaderOperator.this.getSerializerFactory());
        }

        protected <SS extends State> SS getPartitionedState(StateDescriptor<SS, ?> stateDescriptor) throws Exception {
            return (SS)this.keyedStateBackend.getPartitionedState(this.window, WindowReaderOperator.this.namespaceSerializer, stateDescriptor);
        }
    }
}

