/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.testutils;

import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailingIdentityMapper<T>
extends RichMapFunction<T, T>
implements ListCheckpointed<Integer>,
CheckpointListener,
Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
    private static final long serialVersionUID = 6334389850158707313L;
    public static volatile boolean failedBefore;
    private final int failCount;
    private int numElementsTotal;
    private int numElementsThisTime;
    private boolean failer;
    private boolean hasBeenCheckpointed;
    private Thread printer;
    private volatile boolean printerRunning = true;

    public FailingIdentityMapper(int failCount) {
        this.failCount = failCount;
    }

    public void open(OpenContext openContext) {
        this.failer = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0;
        this.printer = new Thread((Runnable)this, "FailingIdentityMapper Status Printer");
        this.printer.start();
    }

    public T map(T value) throws Exception {
        ++this.numElementsTotal;
        ++this.numElementsThisTime;
        if (!failedBefore) {
            Thread.sleep(10L);
            if (this.failer && this.numElementsTotal >= this.failCount) {
                failedBefore = true;
                throw new Exception("Artificial Test Failure");
            }
        }
        return value;
    }

    public void close() throws Exception {
        this.printerRunning = false;
        if (this.printer != null) {
            this.printer.interrupt();
            this.printer = null;
        }
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.hasBeenCheckpointed = true;
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
        return Collections.singletonList(this.numElementsTotal);
    }

    public void restoreState(List<Integer> state) throws Exception {
        if (state.isEmpty() || state.size() > 1) {
            throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
        }
        this.numElementsTotal = state.get(0);
    }

    @Override
    public void run() {
        while (this.printerRunning) {
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            LOG.info("============================> Failing mapper  {}: count={}, totalCount={}", new Object[]{this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.numElementsThisTime, this.numElementsTotal});
        }
    }
}

