package org.apache.oozie.service;

import java.io.IOException;
import java.io.StringReader;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.cli.OozieCLI;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.bundle.BundleCoordSubmitXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
import org.apache.oozie.command.coord.CoordActionReadyXCommand;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
import org.apache.oozie.command.coord.CoordKillXCommand;
import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
import org.apache.oozie.command.coord.CoordResumeXCommand;
import org.apache.oozie.command.coord.CoordSuspendXCommand;
import org.apache.oozie.command.wf.ActionEndXCommand;
import org.apache.oozie.command.wf.ActionStartXCommand;
import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
import org.apache.oozie.command.wf.SignalXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.util.ELUtils;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;

/* loaded from: input_file:WEB-INF/lib/oozie-core-4.3.0-mapr-mep-5.x-1912-r1.jar:org/apache/oozie/service/RecoveryService.class */
public class RecoveryService implements Service {
    public static final String RECOVERY_SERVICE_CONF_PREFIX = "oozie.service.RecoveryService.";
    public static final String CONF_PREFIX_WF_ACTIONS = "oozie.service.RecoveryService.wf.actions.";
    public static final String CONF_PREFIX_COORD = "oozie.service.RecoveryService.coord.";
    public static final String CONF_PREFIX_BUNDLE = "oozie.service.RecoveryService.bundle.";
    public static final String CONF_SERVICE_INTERVAL = "oozie.service.RecoveryService.interval";
    public static final String CONF_CALLABLE_BATCH_SIZE = "oozie.service.RecoveryService.callable.batch.size";
    public static final String CONF_PUSH_DEPENDENCY_INTERVAL = "oozie.service.RecoveryService.push.dependency.interval";
    public static final String CONF_WF_ACTIONS_OLDER_THAN = "oozie.service.RecoveryService.wf.actions.older.than";
    public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = "oozie.service.RecoveryService.wf.actions.created.time.interval";
    public static final String CONF_COORD_OLDER_THAN = "oozie.service.RecoveryService.coord.older.than";
    public static final String CONF_BUNDLE_OLDER_THAN = "oozie.service.RecoveryService.bundle.older.than";
    private static final String INSTRUMENTATION_GROUP = "recovery";
    private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
    private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
    private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
    public static final long ONE_DAY_MILLISCONDS = 90000000;

    /* loaded from: input_file:WEB-INF/lib/oozie-core-4.3.0-mapr-mep-5.x-1912-r1.jar:org/apache/oozie/service/RecoveryService$RecoveryRunnable.class */
    static class RecoveryRunnable implements Runnable {
        private final long olderThan;
        private final long coordOlderThan;
        private final long bundleOlderThan;
        private List<XCallable<?>> callables;
        private List<XCallable<?>> delayedCallables;
        private long delay = 0;
        private StringBuilder msg = null;
        private JPAService jpaService = null;

        public RecoveryRunnable(long j, long j2, long j3) {
            this.olderThan = j;
            this.coordOlderThan = j2;
            this.bundleOlderThan = j3;
        }

        @Override // java.lang.Runnable
        public void run() {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(getClass());
            this.msg = new StringBuilder();
            this.jpaService = (JPAService) Services.get().get(JPAService.class);
            runWFRecovery();
            runCoordActionRecovery();
            runBundleRecovery();
            log.debug("QUEUED [{0}] for potential recovery", this.msg.toString());
            if (null != this.callables) {
                if (!((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSerial(this.callables)) {
                    log.warn("Unable to queue the callables commands for RecoveryService. Most possibly command queue is full. Queue size is :" + ((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSize());
                }
                this.callables = null;
            }
            if (null != this.delayedCallables) {
                if (!((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSerial(this.delayedCallables, this.delay)) {
                    log.warn("Unable to queue the delayedCallables commands for RecoveryService. Most possibly Callable queue is full. Queue size is :" + ((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSize());
                }
                this.delayedCallables = null;
                this.delay = 0L;
            }
        }

        private void runBundleRecovery() {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(getClass());
            try {
                List<BundleActionBean> list = BundleActionQueryExecutor.getInstance().getList(BundleActionQueryExecutor.BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, Long.valueOf(this.bundleOlderThan));
                this.msg.append(", BUNDLE_ACTIONS : ").append(list.size());
                for (BundleActionBean bundleActionBean : list) {
                    try {
                        ((InstrumentationService) Services.get().get(InstrumentationService.class)).get().incr(RecoveryService.INSTRUMENTATION_GROUP, RecoveryService.INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1L);
                        if (bundleActionBean.getCoordId() == null && bundleActionBean.getStatus() != Job.Status.PREP) {
                            log.error("CoordId is null for Bundle action " + bundleActionBean.getBundleActionId());
                        } else if (((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).isJobIdForThisServer(bundleActionBean.getBundleId())) {
                            if (bundleActionBean.getStatus() == Job.Status.PREP && bundleActionBean.getCoordId() == null) {
                                CoordinatorJobBean ifExist = CoordJobQueryExecutor.getInstance().getIfExist(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, bundleActionBean.getCoordName(), bundleActionBean.getBundleId());
                                if (ifExist == null) {
                                    log.debug("Coord [{0}] for bundle [{1}] is not yet submitted , submitting new one", bundleActionBean.getCoordName(), bundleActionBean.getBundleId());
                                    BundleJobBean bundleJobBean = this.jpaService != null ? BundleJobQueryExecutor.getInstance().get(BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, bundleActionBean.getBundleId()) : null;
                                    Element parseXml = XmlUtils.parseXml(bundleJobBean.getJobXml());
                                    for (Element element : parseXml.getChildren(OozieCLI.COORD_OPTION, parseXml.getNamespace())) {
                                        String value = element.getAttribute("name").getValue();
                                        Configuration mergeConfig = RecoveryService.mergeConfig(element, bundleJobBean);
                                        try {
                                            String resolveAppName = ELUtils.resolveAppName(value, mergeConfig);
                                            if (resolveAppName.equals(bundleActionBean.getCoordName())) {
                                                mergeConfig.set(OozieClient.BUNDLE_ID, bundleActionBean.getBundleId());
                                                queueCallable(new BundleCoordSubmitXCommand(mergeConfig, bundleJobBean.getId(), resolveAppName));
                                            }
                                        } catch (Exception e) {
                                            log.error("Error evaluating coord name " + e.getMessage(), e);
                                        }
                                    }
                                } else {
                                    log.debug("Coord [{0}] for bundle [{1}] is submitted , but bundle action is not updated.", bundleActionBean.getCoordName(), bundleActionBean.getBundleId());
                                    queueCallable(new BundleStatusUpdateXCommand(CoordJobQueryExecutor.getInstance().getIfExist(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, bundleActionBean.getCoordName(), ifExist.getId()), bundleActionBean.getStatus()));
                                }
                            } else if (bundleActionBean.getStatus() == Job.Status.KILLED) {
                                queueCallable(new CoordKillXCommand(bundleActionBean.getCoordId()));
                            } else if (bundleActionBean.getStatus() == Job.Status.SUSPENDED || bundleActionBean.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
                                queueCallable(new CoordSuspendXCommand(bundleActionBean.getCoordId()));
                            } else if (bundleActionBean.getStatus() == Job.Status.RUNNING || bundleActionBean.getStatus() == Job.Status.RUNNINGWITHERROR) {
                                queueCallable(new CoordResumeXCommand(bundleActionBean.getCoordId()));
                            }
                        }
                    } catch (Exception e2) {
                        log.error("Exception, {0}", e2.getMessage(), e2);
                    }
                }
            } catch (JPAExecutorException e3) {
                log.warn("Error reading bundle actions from database", e3);
            }
        }

        private void runCoordActionRecovery() {
            HashSet hashSet = new HashSet();
            XLog.Info.get().clear();
            XLog log = XLog.getLog(getClass());
            long j = ConfigurationService.getLong(RecoveryService.CONF_PUSH_DEPENDENCY_INTERVAL);
            long j2 = j;
            Timestamp timestamp = new Timestamp(System.currentTimeMillis() - (this.coordOlderThan * 1000));
            ArrayList<CoordinatorActionBean> arrayList = new ArrayList();
            try {
                arrayList.addAll(CoordActionQueryExecutor.getInstance().getList(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, timestamp));
                arrayList.addAll(CoordActionQueryExecutor.getInstance().getList(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, timestamp));
                this.msg.append(", COORD_ACTIONS : " + arrayList.size());
                for (CoordinatorActionBean coordinatorActionBean : arrayList) {
                    try {
                        if (((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).isJobIdForThisServer(coordinatorActionBean.getId())) {
                            ((InstrumentationService) Services.get().get(InstrumentationService.class)).get().incr(RecoveryService.INSTRUMENTATION_GROUP, RecoveryService.INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1L);
                            if (coordinatorActionBean.getStatus() == CoordinatorAction.Status.WAITING) {
                                queueCallable(new CoordActionInputCheckXCommand(coordinatorActionBean.getId(), coordinatorActionBean.getJobId()));
                                log.debug("Recover a coord action from [WAITING] and resubmit CoordActionInputCheckXCommand :[{0}]", coordinatorActionBean.getId());
                                if (coordinatorActionBean.getPushMissingDependencies() != null && coordinatorActionBean.getPushMissingDependencies().length() != 0) {
                                    queueCallable(new CoordPushDependencyCheckXCommand(coordinatorActionBean.getId(), true, true), j2);
                                    j2 += j;
                                    log.debug("Recover a coord action from [WAITING] and resubmit CoordPushDependencyCheckX :[{0}]", coordinatorActionBean.getId());
                                }
                            } else if (coordinatorActionBean.getStatus() == CoordinatorAction.Status.SUBMITTED) {
                                CoordinatorJobBean coordinatorJobBean = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_USER_APPNAME, coordinatorActionBean.getJobId());
                                queueCallable(new CoordActionStartXCommand(coordinatorActionBean.getId(), coordinatorJobBean.getUser(), coordinatorJobBean.getAppName(), coordinatorActionBean.getJobId()));
                                log.debug("Recover a coord action from [SUBMITTED] and resubmit CoordActionStartCommand :[{0}]", coordinatorActionBean.getId());
                            } else if (coordinatorActionBean.getStatus() == CoordinatorAction.Status.SUSPENDED) {
                                if (coordinatorActionBean.getExternalId() != null && coordinatorActionBean.getPending() > 1) {
                                    queueCallable(new SuspendXCommand(coordinatorActionBean.getExternalId()));
                                    log.debug("Recover a coord action from [SUSPENDED] and resubmit SuspendXCommand :[{0}]", coordinatorActionBean.getId());
                                }
                            } else if (coordinatorActionBean.getStatus() == CoordinatorAction.Status.KILLED) {
                                if (coordinatorActionBean.getExternalId() != null) {
                                    queueCallable(new KillXCommand(coordinatorActionBean.getExternalId()));
                                    log.debug("Recover a coord action from [KILLED] and resubmit KillXCommand :[{0}]", coordinatorActionBean.getId());
                                }
                            } else if (coordinatorActionBean.getStatus() == CoordinatorAction.Status.RUNNING) {
                                if (coordinatorActionBean.getExternalId() != null) {
                                    queueCallable(new ResumeXCommand(coordinatorActionBean.getExternalId()));
                                    log.debug("Recover a coord action from [RUNNING] and resubmit ResumeXCommand :[{0}]", coordinatorActionBean.getId());
                                }
                            } else if (coordinatorActionBean.getStatus() == CoordinatorAction.Status.READY) {
                                hashSet.add(coordinatorActionBean.getJobId());
                            }
                        }
                    } catch (Exception e) {
                        log.error("Exception, {0}", e.getMessage(), e);
                    }
                }
                runCoordActionRecoveryForReady(hashSet);
            } catch (JPAExecutorException e2) {
                log.warn("Error reading coord actions from database", e2);
            }
        }

        private void runCoordActionRecoveryForReady(Set<String> set) {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(getClass());
            try {
                List<String> jobIdsForThisServer = ((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).getJobIdsForThisServer(new ArrayList(set));
                this.msg.append(", COORD_READY_JOBS : " + jobIdsForThisServer.size());
                for (String str : jobIdsForThisServer) {
                    queueCallable(new CoordActionReadyXCommand(str));
                    log.debug("Recover a coord action from [READY] resubmit CoordActionReadyXCommand :[{0}]", str);
                }
            } catch (Exception e) {
                log.error("Exception, {0}", e.getMessage(), e);
            }
        }

        private void runWFRecovery() {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(getClass());
            try {
                List<WorkflowActionBean> list = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQueryExecutor.WorkflowActionQuery.GET_PENDING_ACTIONS, Long.valueOf(this.olderThan), Long.valueOf(new Date().getTime() - (ConfigurationService.getLong(RecoveryService.CONF_WF_ACTIONS_CREATED_TIME_INTERVAL) * RecoveryService.ONE_DAY_MILLISCONDS)));
                this.msg.append(" WF_ACTIONS " + list.size());
                for (WorkflowActionBean workflowActionBean : list) {
                    try {
                        if (((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).isJobIdForThisServer(workflowActionBean.getId())) {
                            ((InstrumentationService) Services.get().get(InstrumentationService.class)).get().incr(RecoveryService.INSTRUMENTATION_GROUP, "actions", 1L);
                            if (workflowActionBean.getStatus() == WorkflowAction.Status.PREP || workflowActionBean.getStatus() == WorkflowAction.Status.START_MANUAL) {
                                queueCallable(new ActionStartXCommand(workflowActionBean.getId(), workflowActionBean.getType()));
                                log.debug("Recover a workflow action from [{0}] status and resubmit ActionStartXCommand :[{1}]", workflowActionBean.getStatus(), workflowActionBean.getId());
                            } else if (workflowActionBean.getStatus() == WorkflowAction.Status.START_RETRY) {
                                queueCallable(new ActionStartXCommand(workflowActionBean.getId(), workflowActionBean.getType()), workflowActionBean.getPendingAge().getTime() - System.currentTimeMillis());
                                log.debug("Recover a workflow action from [START_RETRY] status and resubmit ActionStartXCommand :[{0}]", workflowActionBean.getId());
                            } else if (workflowActionBean.getStatus() == WorkflowAction.Status.DONE || workflowActionBean.getStatus() == WorkflowAction.Status.END_MANUAL) {
                                queueCallable(new ActionEndXCommand(workflowActionBean.getId(), workflowActionBean.getType()));
                                log.debug("Recover a workflow action from [{0}] status and resubmit ActionEndXCommand :[{1}]", workflowActionBean.getStatus(), workflowActionBean.getId());
                            } else if (workflowActionBean.getStatus() == WorkflowAction.Status.END_RETRY) {
                                queueCallable(new ActionEndXCommand(workflowActionBean.getId(), workflowActionBean.getType()), workflowActionBean.getPendingAge().getTime() - System.currentTimeMillis());
                                log.debug("Recover a workflow action from [END_RETRY] status and resubmit ActionEndXCommand :[{0}]", workflowActionBean.getId());
                            } else if (workflowActionBean.getStatus() == WorkflowAction.Status.OK || workflowActionBean.getStatus() == WorkflowAction.Status.ERROR) {
                                queueCallable(new SignalXCommand(workflowActionBean.getJobId(), workflowActionBean.getId()));
                                log.debug("Recover a workflow action from [{0}] status and resubmit SignalXCommand :[{1}]", workflowActionBean.getStatus(), workflowActionBean.getId());
                            } else if (workflowActionBean.getStatus() == WorkflowAction.Status.USER_RETRY) {
                                queueCallable(new ActionStartXCommand(workflowActionBean.getId(), workflowActionBean.getType()));
                                log.debug("Recover a workflow action from [USER_RETRY] status and resubmit ActionStartXCommand :[{0}]", workflowActionBean.getId());
                            }
                        }
                    } catch (Exception e) {
                        log.error("Exception, {0}", e.getMessage(), e);
                    }
                }
            } catch (JPAExecutorException e2) {
                log.warn("Exception while reading pending actions from storage", e2);
            }
        }

        private void queueCallable(XCallable<?> xCallable) {
            if (this.callables == null) {
                this.callables = new ArrayList();
            }
            this.callables.add(xCallable);
            if (this.callables.size() == Services.get().getConf().getInt(RecoveryService.CONF_CALLABLE_BATCH_SIZE, 10)) {
                if (!((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSerial(this.callables)) {
                    XLog.getLog(getClass()).warn("Unable to queue the callables commands for RecoveryService. Most possibly command queue is full. Queue size is :" + ((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSize());
                }
                this.callables = new ArrayList();
            }
        }

        private void queueCallable(XCallable<?> xCallable, long j) {
            if (this.delayedCallables == null) {
                this.delayedCallables = new ArrayList();
            }
            this.delay = Math.max(this.delay, j);
            this.delayedCallables.add(xCallable);
            if (this.delayedCallables.size() == ConfigurationService.getInt(RecoveryService.CONF_CALLABLE_BATCH_SIZE)) {
                if (!((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSerial(this.delayedCallables, this.delay)) {
                    XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. Most possibly Callable queue is full. Queue size is :" + ((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSize());
                }
                this.delayedCallables = new ArrayList();
                this.delay = 0L;
            }
        }
    }

    @Override // org.apache.oozie.service.Service
    public void init(Services services) {
        Configuration conf = services.getConf();
        ((SchedulerService) services.get(SchedulerService.class)).schedule(new RecoveryRunnable(ConfigurationService.getInt(conf, CONF_WF_ACTIONS_OLDER_THAN), ConfigurationService.getInt(conf, CONF_COORD_OLDER_THAN), ConfigurationService.getInt(conf, CONF_BUNDLE_OLDER_THAN)), 10L, getRecoveryServiceInterval(conf), SchedulerService.Unit.SEC);
    }

    public int getRecoveryServiceInterval(Configuration configuration) {
        return ConfigurationService.getInt(configuration, CONF_SERVICE_INTERVAL);
    }

    @Override // org.apache.oozie.service.Service
    public void destroy() {
    }

    @Override // org.apache.oozie.service.Service
    public Class<? extends Service> getInterface() {
        return RecoveryService.class;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Configuration mergeConfig(Element element, BundleJobBean bundleJobBean) throws CommandException {
        XLog.Info.get().clear();
        XLog log = XLog.getLog("RecoveryService");
        String conf = bundleJobBean.getConf();
        try {
            XConfiguration xConfiguration = new XConfiguration(new StringReader(conf));
            Element child = element.getChild("configuration", element.getNamespace());
            if (child != null) {
                String prettyPrint = XmlUtils.prettyPrint(child).toString();
                try {
                    XConfiguration.copy(new XConfiguration(new StringReader(prettyPrint)), xConfiguration);
                } catch (IOException e) {
                    log.warn("Configuration parse error in:" + prettyPrint);
                    throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
                }
            }
            xConfiguration.set(OozieClient.COORDINATOR_APP_PATH, element.getChild("app-path", element.getNamespace()).getValue());
            try {
                JobUtils.normalizeAppPath(xConfiguration.get("user.name"), xConfiguration.get(OozieClient.GROUP_NAME), xConfiguration);
                return xConfiguration;
            } catch (IOException e2) {
                throw new CommandException(ErrorCode.E1001, xConfiguration.get(OozieClient.COORDINATOR_APP_PATH));
            }
        } catch (IOException e3) {
            log.warn("Configuration parse error in:" + conf);
            throw new CommandException(ErrorCode.E1306, e3.getMessage(), e3);
        }
    }
}
