package org.apache.oozie;

import java.io.IOException;
import java.io.StringReader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.BaseEngine;
import org.apache.oozie.cli.OozieCLI;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.OperationType;
import org.apache.oozie.command.wf.BulkWorkflowXCommand;
import org.apache.oozie.command.wf.CompletedActionXCommand;
import org.apache.oozie.command.wf.DefinitionXCommand;
import org.apache.oozie.command.wf.ExternalIdXCommand;
import org.apache.oozie.command.wf.JobXCommand;
import org.apache.oozie.command.wf.JobsXCommand;
import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.ReRunXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
import org.apache.oozie.command.wf.StartXCommand;
import org.apache.oozie.command.wf.SubmitHiveXCommand;
import org.apache.oozie.command.wf.SubmitHttpXCommand;
import org.apache.oozie.command.wf.SubmitMRXCommand;
import org.apache.oozie.command.wf.SubmitPigXCommand;
import org.apache.oozie.command.wf.SubmitSqoopXCommand;
import org.apache.oozie.command.wf.SubmitXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogService;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XLogAuditFilter;
import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.XLogUserFilterParam;

/* loaded from: input_file:WEB-INF/lib/oozie-core-4.2.0-mapr-1710-r4.jar:org/apache/oozie/DagEngine.class */
public class DagEngine extends BaseEngine {
    private static final int HIGH_PRIORITY = 2;
    private static XLog LOG = XLog.getLog(DagEngine.class);
    private static final Set<String> FILTER_NAMES = new HashSet();

    public DagEngine() {
    }

    public DagEngine(String str) {
        this();
        this.user = ParamChecker.notEmpty(str, "user");
    }

    @Override // org.apache.oozie.BaseEngine
    public String submitJob(Configuration configuration, boolean z) throws DagEngineException {
        validateSubmitConfiguration(configuration);
        try {
            String call = new SubmitXCommand(configuration).call();
            if (z) {
                start(call);
            }
            return call;
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    public String submitJobFromCoordinator(Configuration configuration, String str) throws DagEngineException {
        validateSubmitConfiguration(configuration);
        try {
            String call = new SubmitXCommand(configuration, str).call();
            start(call);
            return call;
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    public String submitHttpJob(Configuration configuration, String str) throws DagEngineException {
        validateSubmitConfiguration(configuration);
        try {
            SubmitHttpXCommand submitHttpXCommand = null;
            if (str.equals(OozieCLI.PIG_CMD)) {
                submitHttpXCommand = new SubmitPigXCommand(configuration);
            } else if (str.equals(OozieCLI.MR_CMD)) {
                submitHttpXCommand = new SubmitMRXCommand(configuration);
            } else if (str.equals(OozieCLI.HIVE_CMD)) {
                submitHttpXCommand = new SubmitHiveXCommand(configuration);
            } else if (str.equals(OozieCLI.SQOOP_CMD)) {
                submitHttpXCommand = new SubmitSqoopXCommand(configuration);
            }
            String call = submitHttpXCommand.call();
            start(call);
            return call;
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    private void validateSubmitConfiguration(Configuration configuration) throws DagEngineException {
        if (configuration.get(OozieClient.APP_PATH) == null) {
            throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public void start(String str) throws DagEngineException {
        try {
            new StartXCommand(str).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public void resume(String str) throws DagEngineException {
        try {
            new ResumeXCommand(str).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public void suspend(String str) throws DagEngineException {
        try {
            new SuspendXCommand(str).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public void kill(String str) throws DagEngineException {
        try {
            new KillXCommand(str).call();
            LOG.info("User " + this.user + " killed the WF job " + str);
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public void change(String str, String str2) throws DagEngineException {
        throw new DagEngineException(ErrorCode.E1017, new Object[0]);
    }

    @Override // org.apache.oozie.BaseEngine
    public void reRun(String str, Configuration configuration) throws DagEngineException {
        try {
            XConfiguration xConfiguration = new XConfiguration(new StringReader(WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, str).getConf()));
            XConfiguration.copy(configuration, xConfiguration);
            validateReRunConfiguration(xConfiguration);
            new ReRunXCommand(str, xConfiguration).call();
        } catch (IOException e) {
            throw new DagEngineException(ErrorCode.E0803, e.getMessage());
        } catch (CommandException e2) {
            throw new DagEngineException(e2);
        } catch (JPAExecutorException e3) {
            throw new DagEngineException(e3);
        }
    }

    private void validateReRunConfiguration(Configuration configuration) throws DagEngineException {
        if (configuration.get(OozieClient.APP_PATH) == null) {
            throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
        }
        if (configuration.get(OozieClient.RERUN_SKIP_NODES) == null && configuration.get(OozieClient.RERUN_FAIL_NODES) == null) {
            throw new DagEngineException(ErrorCode.E0401, "oozie.wf.rerun.skip.nodes OR oozie.wf.rerun.failnodes");
        }
        if (configuration.get(OozieClient.RERUN_SKIP_NODES) != null && configuration.get(OozieClient.RERUN_FAIL_NODES) != null) {
            throw new DagEngineException(ErrorCode.E0404, "oozie.wf.rerun.skip.nodes OR oozie.wf.rerun.failnodes");
        }
    }

    public void processCallback(String str, String str2, Properties properties) throws DagEngineException {
        XLog.Info.get().clearParameter(XLogService.GROUP);
        XLog.Info.get().clearParameter(XLogService.USER);
        if (((CallableQueueService) Services.get().get(CallableQueueService.class)).queue(new CompletedActionXCommand(str, str2, properties, 2))) {
            return;
        }
        LOG.warn(4, "queue is full or system is in SAFEMODE, ignoring callback", new Object[0]);
    }

    @Override // org.apache.oozie.BaseEngine
    public WorkflowJob getJob(String str) throws DagEngineException {
        try {
            return new JobXCommand(str).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public WorkflowJob getJob(String str, int i, int i2) throws DagEngineException {
        try {
            return new JobXCommand(str, i, i2).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public String getDefinition(String str) throws DagEngineException {
        try {
            return new DefinitionXCommand(str).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public void streamLog(String str, Writer writer, Map<String, String[]> map) throws IOException, DagEngineException {
        streamJobLog(str, writer, map, BaseEngine.LOG_TYPE.LOG);
    }

    @Override // org.apache.oozie.BaseEngine
    public void streamErrorLog(String str, Writer writer, Map<String, String[]> map) throws IOException, DagEngineException {
        streamJobLog(str, writer, map, BaseEngine.LOG_TYPE.ERROR_LOG);
    }

    @Override // org.apache.oozie.BaseEngine
    public void streamAuditLog(String str, Writer writer, Map<String, String[]> map) throws IOException, DagEngineException {
        try {
            streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(map)), str, writer, map, BaseEngine.LOG_TYPE.AUDIT_LOG);
        } catch (CommandException e) {
            throw new IOException(e);
        }
    }

    private void streamJobLog(String str, Writer writer, Map<String, String[]> map, BaseEngine.LOG_TYPE log_type) throws IOException, DagEngineException {
        try {
            streamJobLog(new XLogFilter(new XLogUserFilterParam(map)), str, writer, map, log_type);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void streamJobLog(XLogFilter xLogFilter, String str, Writer writer, Map<String, String[]> map, BaseEngine.LOG_TYPE log_type) throws IOException, DagEngineException {
        try {
            xLogFilter.setParameter(DagXLogInfoService.JOB, str);
            WorkflowJob job = getJob(str);
            Date endTime = job.getEndTime();
            if (endTime == null) {
                endTime = job.getLastModifiedTime();
            }
            fetchLog(xLogFilter, job.getCreatedTime(), endTime, writer, map, log_type);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    protected Map<String, List<String>> parseFilter(String str) throws DagEngineException {
        HashMap hashMap = new HashMap();
        if (str != null) {
            StringTokenizer stringTokenizer = new StringTokenizer(str, ";");
            while (stringTokenizer.hasMoreTokens()) {
                String nextToken = stringTokenizer.nextToken();
                if (!nextToken.contains("=")) {
                    throw new DagEngineException(ErrorCode.E0420, str, "elements must be name=value pairs");
                }
                String[] split = nextToken.split("=");
                if (split.length != 2) {
                    throw new DagEngineException(ErrorCode.E0420, str, "elements must be name=value pairs");
                }
                split[0] = split[0].toLowerCase();
                if (!FILTER_NAMES.contains(split[0])) {
                    throw new DagEngineException(ErrorCode.E0420, str, XLog.format("invalid name [{0}]", split[0]));
                }
                if (split[0].equals("status")) {
                    try {
                        WorkflowJob.Status.valueOf(split[1]);
                    } catch (IllegalArgumentException e) {
                        throw new DagEngineException(ErrorCode.E0420, str, XLog.format("invalid status [{0}]", split[1]));
                    }
                }
                List list = (List) hashMap.get(split[0]);
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(split[0], list);
                }
                list.add(split[1]);
            }
        }
        return hashMap;
    }

    public WorkflowsInfo getJobs(String str, int i, int i2) throws DagEngineException {
        try {
            return new JobsXCommand(parseFilter(str), i, i2).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public String getJobIdForExternalId(String str) throws DagEngineException {
        try {
            return new ExternalIdXCommand(str).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public CoordinatorJob getCoordJob(String str) throws BaseEngineException {
        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
    }

    @Override // org.apache.oozie.BaseEngine
    public CoordinatorJob getCoordJob(String str, String str2, int i, int i2, boolean z) throws BaseEngineException {
        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
    }

    public WorkflowActionBean getWorkflowAction(String str) throws BaseEngineException {
        try {
            return new WorkflowActionInfoXCommand(str).call();
        } catch (CommandException e) {
            throw new BaseEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public String dryRunSubmit(Configuration configuration) throws BaseEngineException {
        try {
            return new SubmitXCommand(true, configuration).call();
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public String getJobStatus(String str) throws DagEngineException {
        try {
            return WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW_STATUS, str).getStatusStr();
        } catch (JPAExecutorException e) {
            throw new DagEngineException(e);
        }
    }

    @Override // org.apache.oozie.BaseEngine
    public void enableSLAAlert(String str, String str2, String str3, String str4) throws BaseEngineException {
        throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow"));
    }

    @Override // org.apache.oozie.BaseEngine
    public void disableSLAAlert(String str, String str2, String str3, String str4) throws BaseEngineException {
        throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow"));
    }

    @Override // org.apache.oozie.BaseEngine
    public void changeSLA(String str, String str2, String str3, String str4, String str5) throws BaseEngineException {
        throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow"));
    }

    public WorkflowsInfo killJobs(String str, int i, int i2) throws DagEngineException {
        try {
            WorkflowsInfo call = new BulkWorkflowXCommand(parseFilter(str), i, i2, OperationType.Kill).call();
            return call == null ? new WorkflowsInfo(new ArrayList(), 0, 0, 0) : call;
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    public WorkflowsInfo suspendJobs(String str, int i, int i2) throws DagEngineException {
        try {
            WorkflowsInfo call = new BulkWorkflowXCommand(parseFilter(str), i, i2, OperationType.Suspend).call();
            return call == null ? new WorkflowsInfo(new ArrayList(), 0, 0, 0) : call;
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    public WorkflowsInfo resumeJobs(String str, int i, int i2) throws DagEngineException {
        try {
            WorkflowsInfo call = new BulkWorkflowXCommand(parseFilter(str), i, i2, OperationType.Resume).call();
            return call == null ? new WorkflowsInfo(new ArrayList(), 0, 0, 0) : call;
        } catch (CommandException e) {
            throw new DagEngineException(e);
        }
    }

    static {
        FILTER_NAMES.add("user");
        FILTER_NAMES.add("name");
        FILTER_NAMES.add("group");
        FILTER_NAMES.add("status");
        FILTER_NAMES.add("id");
        FILTER_NAMES.add("startcreatedtime");
        FILTER_NAMES.add("endcreatedtime");
    }
}
