package org.apache.oozie.service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.event.BundleJobEvent;
import org.apache.oozie.event.CoordinatorActionEvent;
import org.apache.oozie.event.CoordinatorJobEvent;
import org.apache.oozie.event.EventQueue;
import org.apache.oozie.event.MemoryEventQueue;
import org.apache.oozie.event.WorkflowActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.event.listener.JobEventListener;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.sla.listener.SLAEventListener;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;

/* loaded from: input_file:WEB-INF/lib/oozie-core-4.2.0-mapr-1508.jar:org/apache/oozie/service/EventHandlerService.class */
public class EventHandlerService implements Service {
    public static final String CONF_PREFIX = "oozie.service.EventHandlerService.";
    public static final String CONF_QUEUE_SIZE = "oozie.service.EventHandlerService.queue.size";
    public static final String CONF_EVENT_QUEUE = "oozie.service.EventHandlerService.event.queue";
    public static final String CONF_LISTENERS = "oozie.service.EventHandlerService.event.listeners";
    public static final String CONF_FILTER_APP_TYPES = "oozie.service.EventHandlerService.filter.app.types";
    public static final String CONF_BATCH_SIZE = "oozie.service.EventHandlerService.batch.size";
    public static final String CONF_WORKER_THREADS = "oozie.service.EventHandlerService.worker.threads";
    public static final String CONF_WORKER_INTERVAL = "oozie.service.EventHandlerService.worker.interval";
    private static EventQueue eventQueue;
    private XLog LOG;
    private Map<Event.MessageType, List<?>> listenerMap = new HashMap();
    private Set<String> apptypes;
    private static boolean eventsEnabled = false;
    private int numWorkers;

    /* loaded from: input_file:WEB-INF/lib/oozie-core-4.2.0-mapr-1508.jar:org/apache/oozie/service/EventHandlerService$EventWorker.class */
    public class EventWorker implements Runnable {
        public EventWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            try {
                if (!EventHandlerService.eventQueue.isEmpty()) {
                    for (Event event : EventHandlerService.eventQueue.pollBatch()) {
                        EventHandlerService.this.LOG = LogUtils.setLogPrefix(EventHandlerService.this.LOG, event);
                        EventHandlerService.this.LOG.debug("Processing event : {0}", event);
                        Event.MessageType msgType = event.getMsgType();
                        List list = (List) EventHandlerService.this.listenerMap.get(msgType);
                        if (list != null) {
                            Iterator it = list.iterator();
                            while (it.hasNext()) {
                                try {
                                    if (msgType == Event.MessageType.JOB) {
                                        invokeJobEventListener((JobEventListener) it.next(), (JobEvent) event);
                                    } else if (msgType == Event.MessageType.SLA) {
                                        invokeSLAEventListener((SLAEventListener) it.next(), (SLAEvent) event);
                                    } else {
                                        it.next();
                                    }
                                } catch (Throwable th) {
                                    XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", th);
                                }
                            }
                        }
                    }
                }
            } catch (Throwable th2) {
                XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", th2);
            }
        }

        private void invokeJobEventListener(JobEventListener jobEventListener, JobEvent jobEvent) {
            switch (jobEvent.getAppType()) {
                case WORKFLOW_JOB:
                    jobEventListener.onWorkflowJobEvent((WorkflowJobEvent) jobEvent);
                    return;
                case WORKFLOW_ACTION:
                    jobEventListener.onWorkflowActionEvent((WorkflowActionEvent) jobEvent);
                    return;
                case COORDINATOR_JOB:
                    jobEventListener.onCoordinatorJobEvent((CoordinatorJobEvent) jobEvent);
                    return;
                case COORDINATOR_ACTION:
                    jobEventListener.onCoordinatorActionEvent((CoordinatorActionEvent) jobEvent);
                    return;
                case BUNDLE_JOB:
                    jobEventListener.onBundleJobEvent((BundleJobEvent) jobEvent);
                    return;
                default:
                    XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}", jobEvent.getAppType());
                    return;
            }
        }

        private void invokeSLAEventListener(SLAEventListener sLAEventListener, SLAEvent sLAEvent) {
            switch (sLAEvent.getEventStatus()) {
                case START_MET:
                    sLAEventListener.onStartMet(sLAEvent);
                    return;
                case START_MISS:
                    sLAEventListener.onStartMiss(sLAEvent);
                    return;
                case END_MET:
                    sLAEventListener.onEndMet(sLAEvent);
                    return;
                case END_MISS:
                    sLAEventListener.onEndMiss(sLAEvent);
                    return;
                case DURATION_MET:
                    sLAEventListener.onDurationMet(sLAEvent);
                    return;
                case DURATION_MISS:
                    sLAEventListener.onDurationMiss(sLAEvent);
                    return;
                default:
                    XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", sLAEvent.getSLAStatus());
                    return;
            }
        }
    }

    @Override // org.apache.oozie.service.Service
    public void init(Services services) throws ServiceException {
        try {
            Configuration conf = services.getConf();
            this.LOG = XLog.getLog(getClass());
            Class<?> cls = ConfigurationService.getClass(conf, CONF_EVENT_QUEUE);
            eventQueue = cls == null ? new MemoryEventQueue() : (EventQueue) cls.newInstance();
            eventQueue.init(conf);
            initApptypes(conf);
            initEventListeners(conf);
            initWorkerThreads(conf, services);
            eventsEnabled = true;
            this.LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}], Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass().getName(), this.listenerMap.toString(), this.apptypes, Integer.valueOf(this.numWorkers));
        } catch (Exception e) {
            throw new ServiceException(ErrorCode.E0100, e.getMessage(), e);
        }
    }

    private void initApptypes(Configuration configuration) {
        this.apptypes = new HashSet();
        for (String str : ConfigurationService.getStrings(configuration, CONF_FILTER_APP_TYPES)) {
            String lowerCase = str.trim().toLowerCase();
            if (lowerCase.length() != 0) {
                this.apptypes.add(lowerCase);
            }
        }
    }

    private void initEventListeners(Configuration configuration) throws Exception {
        Class<?>[] classes = ConfigurationService.getClasses(configuration, CONF_LISTENERS);
        for (int i = 0; i < classes.length; i++) {
            Object obj = null;
            try {
                obj = classes[i].newInstance();
            } catch (IllegalAccessException e) {
                this.LOG.warn("Illegal access to event listener instance, " + e);
            } catch (InstantiationException e2) {
                this.LOG.warn("Could not create event listener instance, " + e2);
            }
            addEventListener(obj, configuration, classes[i].getName());
        }
    }

    public void addEventListener(Object obj, Configuration configuration, String str) throws Exception {
        if (obj instanceof JobEventListener) {
            List<?> list = this.listenerMap.get(Event.MessageType.JOB);
            if (list == null) {
                list = new ArrayList();
                this.listenerMap.put(Event.MessageType.JOB, list);
            }
            list.add(obj);
            ((JobEventListener) obj).init(configuration);
            return;
        }
        if (!(obj instanceof SLAEventListener)) {
            this.LOG.warn("Event listener [{0}] is of undefined type", str);
            return;
        }
        List<?> list2 = this.listenerMap.get(Event.MessageType.SLA);
        if (list2 == null) {
            list2 = new ArrayList();
            this.listenerMap.put(Event.MessageType.SLA, list2);
        }
        list2.add(obj);
        ((SLAEventListener) obj).init(configuration);
    }

    public static boolean isEnabled() {
        return eventsEnabled;
    }

    private void initWorkerThreads(Configuration configuration, Services services) throws ServiceException {
        this.numWorkers = ConfigurationService.getInt(configuration, CONF_WORKER_THREADS);
        int i = ConfigurationService.getInt(configuration, CONF_WORKER_INTERVAL);
        SchedulerService schedulerService = (SchedulerService) services.get(SchedulerService.class);
        if (this.numWorkers + 3 > schedulerService.getSchedulableThreads(configuration)) {
            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested [" + this.numWorkers + "] cannot be handled with current settings. Increase " + SchedulerService.SCHEDULER_THREADS);
        }
        EventWorker eventWorker = new EventWorker();
        for (int i2 = 0; i2 < this.numWorkers; i2++) {
            schedulerService.schedule(eventWorker, 10 + (i2 * 20), i, SchedulerService.Unit.SEC);
        }
    }

    @Override // org.apache.oozie.service.Service
    public void destroy() {
        eventsEnabled = false;
        for (Event.MessageType messageType : this.listenerMap.keySet()) {
            Iterator<?> it = this.listenerMap.get(messageType).iterator();
            while (it.hasNext()) {
                if (messageType == Event.MessageType.JOB) {
                    ((JobEventListener) it.next()).destroy();
                } else if (messageType == Event.MessageType.SLA) {
                    ((SLAEventListener) it.next()).destroy();
                }
            }
        }
    }

    @Override // org.apache.oozie.service.Service
    public Class<? extends Service> getInterface() {
        return EventHandlerService.class;
    }

    public boolean isSupportedApptype(String str) {
        return this.apptypes.contains(str.toLowerCase());
    }

    public void setAppTypes(Set<String> set) {
        this.apptypes = set;
    }

    public Set<String> getAppTypes() {
        return this.apptypes;
    }

    public String listEventListeners() {
        return this.listenerMap.toString();
    }

    public void queueEvent(Event event) {
        this.LOG = LogUtils.setLogPrefix(this.LOG, event);
        this.LOG.debug("Queueing event : {0}", event);
        this.LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
        eventQueue.add(event);
        LogUtils.clearLogPrefix();
    }

    public EventQueue getEventQueue() {
        return eventQueue;
    }
}
