/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.dsv2.wordcount;

import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Set;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.StateDeclaration;
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.typeinfo.TypeDescriptor;
import org.apache.flink.api.common.typeinfo.TypeDescriptors;
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.connector.dsv2.Source;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.connector.dsv2.WrappedSource;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import org.apache.flink.util.ParameterTool;
import org.apache.flink.util.TimeUtils;

public class WordCount {
    public static void main(String[] args) throws Exception {
        NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream text;
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
        env.setExecutionMode(RuntimeExecutionMode.STREAMING);
        if (params.has("input")) {
            FileSource.FileSourceBuilder builder = FileSource.forRecordStreamFormat((StreamFormat)new TextLineInputFormat(), (Path[])new Path[]{new Path(params.get("input"))});
            if (params.has("discovery-interval")) {
                Duration discoveryInterval = TimeUtils.parseDuration((String)params.get("discovery-interval"));
                builder.monitorContinuously(discoveryInterval);
            }
            text = env.fromSource((Source)new WrappedSource((org.apache.flink.api.connector.source.Source)builder.build()), "file-input");
        } else {
            text = env.fromSource(DataStreamV2SourceUtils.fromData(Arrays.asList(WordCountData.WORDS)), "in-memory-input");
        }
        KeyedPartitionStream keyedStream = ((NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream)text.process((OneInputStreamProcessFunction)new Tokenizer()).withName("tokenizer")).keyBy((KeySelector & Serializable)value -> (String)value.f0);
        NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream counts = keyedStream.process((OneInputStreamProcessFunction)new Counter());
        if (params.has("output")) {
            counts.toSink((Sink)new WrappedSink((org.apache.flink.api.connector.sink2.Sink)((FileSink.DefaultRowFormatBuilder)FileSink.forRowFormat((Path)new Path(params.get("output")), (Encoder)new SimpleStringEncoder()).withRollingPolicy((RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(MemorySize.ofMebiBytes((long)1L)).withRolloverInterval(Duration.ofSeconds(10L)).build())).build())).withName("file-sink");
        } else {
            counts.toSink((Sink)new WrappedSink((org.apache.flink.api.connector.sink2.Sink)new PrintSink())).withName("print-sink");
        }
        env.execute("WordCount");
    }

    public static final class Tokenizer
    implements OneInputStreamProcessFunction<String, Tuple2<String, Integer>> {
        public void processRecord(String record, Collector<Tuple2<String, Integer>> output, PartitionedContext<Tuple2<String, Integer>> ctx) throws Exception {
            String[] tokens;
            for (String token : tokens = record.toLowerCase().split("\\W+")) {
                if (token.isEmpty()) continue;
                output.collect((Object)new Tuple2((Object)token, (Object)1));
            }
        }
    }

    public static final class Counter
    implements OneInputStreamProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private final ValueStateDeclaration<Integer> countStateDeclaration = StateDeclarations.valueState((String)"count", (TypeDescriptor)TypeDescriptors.INT);

        public Set<StateDeclaration> usesStates() {
            return Set.of(this.countStateDeclaration);
        }

        public void processRecord(Tuple2<String, Integer> record, Collector<Tuple2<String, Integer>> output, PartitionedContext<Tuple2<String, Integer>> ctx) throws Exception {
            String word = (String)record.f0;
            Integer count = (Integer)record.f1;
            Integer previousCount = (Integer)ctx.getStateManager().getState(this.countStateDeclaration).value();
            Integer newlyCount = previousCount == null ? count : previousCount + count;
            ctx.getStateManager().getState(this.countStateDeclaration).update((Object)newlyCount);
            output.collect((Object)Tuple2.of((Object)word, (Object)newlyCount));
        }
    }
}

