package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadDriver;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.class */
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text, UserCommandKey, CountTimeWritable> {
    public static final String INPUT_PATH_KEY = "auditreplay.input-path";
    public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";
    public static final String NUM_THREADS_KEY = "auditreplay.num-threads";
    public static final int NUM_THREADS_DEFAULT = 1;
    public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks";
    public static final boolean CREATE_BLOCKS_DEFAULT = true;
    public static final String RATE_FACTOR_KEY = "auditreplay.rate-factor";
    public static final double RATE_FACTOR_DEFAULT = 1.0d;
    public static final String COMMAND_PARSER_KEY = "auditreplay.command-parser.class";
    public static final Class<AuditLogDirectParser> COMMAND_PARSER_DEFAULT = AuditLogDirectParser.class;
    private static final Logger LOG = LoggerFactory.getLogger(AuditReplayMapper.class);
    private static final long MAX_READAHEAD_MS = 60000;
    public static final String INDIVIDUAL_COMMANDS_COUNTER_GROUP = "INDIVIDUAL_COMMANDS";
    public static final String INDIVIDUAL_COMMANDS_LATENCY_SUFFIX = "_LATENCY";
    public static final String INDIVIDUAL_COMMANDS_INVALID_SUFFIX = "_INVALID";
    public static final String INDIVIDUAL_COMMANDS_COUNT_SUFFIX = "_COUNT";
    private long startTimestampMs;
    private int numThreads;
    private double rateFactor;
    private long highestTimestamp;
    private List<AuditReplayThread> threads;
    private DelayQueue<AuditReplayCommand> commandQueue;
    private Function<Long, Long> relativeToAbsoluteTimestamp;
    private AuditCommandParser commandParser;
    private ScheduledThreadPoolExecutor progressExecutor;

    /* loaded from: input_file:org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper$CommandType.class */
    public enum CommandType {
        READ,
        WRITE
    }

    /* loaded from: input_file:org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper$REPLAYCOUNTERS.class */
    public enum REPLAYCOUNTERS {
        TOTALCOMMANDS,
        TOTALINVALIDCOMMANDS,
        TOTALUNSUPPORTEDCOMMANDS,
        LATECOMMANDS,
        LATECOMMANDSTOTALTIME,
        TOTALWRITECOMMANDS,
        TOTALWRITECOMMANDLATENCY,
        TOTALREADCOMMANDS,
        TOTALREADCOMMANDLATENCY
    }

    /* loaded from: input_file:org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper$ReplayCommand.class */
    public enum ReplayCommand {
        APPEND(CommandType.WRITE),
        CREATE(CommandType.WRITE),
        GETFILEINFO(CommandType.READ),
        CONTENTSUMMARY(CommandType.READ),
        MKDIRS(CommandType.WRITE),
        RENAME(CommandType.WRITE),
        LISTSTATUS(CommandType.READ),
        DELETE(CommandType.WRITE),
        OPEN(CommandType.READ),
        SETPERMISSION(CommandType.WRITE),
        SETOWNER(CommandType.WRITE),
        SETTIMES(CommandType.WRITE),
        SETREPLICATION(CommandType.WRITE),
        CONCAT(CommandType.WRITE);

        private final CommandType type;

        ReplayCommand(CommandType commandType) {
            this.type = commandType;
        }

        public CommandType getType() {
            return this.type;
        }
    }

    @Override // org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper
    public String getDescription() {
        return "This mapper replays audit log files.";
    }

    @Override // org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper
    public List<String> getConfigDescriptions() {
        return Lists.newArrayList(new String[]{"auditreplay.input-path (required): Path to directory containing input files.", "auditreplay.output-path (required): Path to destination for output files.", "auditreplay.num-threads (default 1): Number of threads to use per mapper for replay.", "auditreplay.create-blocks (default true): Whether or not to create 1-byte blocks when performing `create` commands.", "auditreplay.rate-factor (default 1.0): Multiplicative speed at which to replay the audit log; e.g. a value of 2.0 would make the replay occur at twice the original speed. This can be useful to induce heavier loads."});
    }

    @Override // org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper
    public boolean verifyConfigurations(Configuration configuration) {
        return (configuration.get(INPUT_PATH_KEY) == null || configuration.get(OUTPUT_PATH_KEY) == null) ? false : true;
    }

    public void setup(Mapper.Context context) throws IOException {
        Configuration configuration = context.getConfiguration();
        this.startTimestampMs = configuration.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1L);
        this.numThreads = configuration.getInt(NUM_THREADS_KEY, 1);
        this.rateFactor = configuration.getDouble(RATE_FACTOR_KEY, 1.0d);
        try {
            this.commandParser = (AuditCommandParser) configuration.getClass(COMMAND_PARSER_KEY, COMMAND_PARSER_DEFAULT, AuditCommandParser.class).getConstructor(new Class[0]).newInstance(new Object[0]);
            this.commandParser.initialize(configuration);
            this.relativeToAbsoluteTimestamp = l -> {
                return Long.valueOf(this.startTimestampMs + Math.round(l.longValue() / this.rateFactor));
            };
            LOG.info("Starting " + this.numThreads + " threads");
            this.progressExecutor = new ScheduledThreadPoolExecutor(1);
            long j = configuration.getLong("mapreduce.task.timeout", 120000L) / 2;
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = this.progressExecutor;
            Objects.requireNonNull(context);
            scheduledThreadPoolExecutor.scheduleAtFixedRate(context::progress, j, j, TimeUnit.MILLISECONDS);
            this.threads = new ArrayList();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            this.commandQueue = new DelayQueue<>();
            for (int i = 0; i < this.numThreads; i++) {
                AuditReplayThread auditReplayThread = new AuditReplayThread(context, this.commandQueue, concurrentHashMap);
                this.threads.add(auditReplayThread);
                auditReplayThread.start();
            }
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new IOException("Exception encountered while instantiating the command parser", e);
        }
    }

    public void map(LongWritable longWritable, Text text, Mapper.Context context) throws IOException, InterruptedException {
        AuditReplayCommand parse = this.commandParser.parse(text, this.relativeToAbsoluteTimestamp);
        long delay = parse.getDelay(TimeUnit.MILLISECONDS);
        if (delay > MAX_READAHEAD_MS) {
            Thread.sleep(delay - 30000);
        }
        this.commandQueue.put((DelayQueue<AuditReplayCommand>) parse);
        this.highestTimestamp = parse.getAbsoluteTimestamp();
    }

    public void cleanup(Mapper.Context context) throws InterruptedException, IOException {
        Iterator<AuditReplayThread> it = this.threads.iterator();
        while (it.hasNext()) {
            it.next().addToQueue(AuditReplayCommand.getPoisonPill(this.highestTimestamp + 1));
        }
        Optional empty = Optional.empty();
        for (AuditReplayThread auditReplayThread : this.threads) {
            auditReplayThread.join();
            auditReplayThread.drainCounters(context);
            auditReplayThread.drainCommandLatencies(context);
            if (auditReplayThread.getException() != null) {
                empty = Optional.of(auditReplayThread.getException());
            }
        }
        this.progressExecutor.shutdown();
        if (empty.isPresent()) {
            throw new RuntimeException("Exception in AuditReplayThread", (Throwable) empty.get());
        }
        LOG.info("Time taken to replay the logs in ms: " + (System.currentTimeMillis() - this.startTimestampMs));
        long value = context.getCounter(REPLAYCOUNTERS.TOTALCOMMANDS).getValue();
        if (value != 0) {
            LOG.info("Percentage of invalid ops: " + ((context.getCounter(REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).getValue() * 100.0d) / value));
        }
    }

    @Override // org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper
    public void configureJob(Job job) {
        job.setMapOutputKeyClass(UserCommandKey.class);
        job.setMapOutputValueClass(CountTimeWritable.class);
        job.setInputFormatClass(NoSplitTextInputFormat.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(AuditReplayReducer.class);
        job.setOutputKeyClass(UserCommandKey.class);
        job.setOutputValueClass(CountTimeWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY)));
        job.getConfiguration().set(TextOutputFormat.SEPARATOR, ",");
    }
}
