package org.apache.oozie.service;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.jms.ConnectionContext;
import org.apache.oozie.jms.DefaultConnectionContext;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.jms.JMSExceptionListener;
import org.apache.oozie.jms.MessageHandler;
import org.apache.oozie.jms.MessageReceiver;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.util.XLog;

/* loaded from: input_file:WEB-INF/lib/oozie-core-4.3.0-mapr-1901.jar:org/apache/oozie/service/JMSAccessorService.class */
public class JMSAccessorService implements Service {
    public static final String CONF_PREFIX = "oozie.service.JMSAccessorService.";
    public static final String JMS_CONNECTION_CONTEXT_IMPL = "oozie.service.JMSAccessorService.connectioncontext.impl";
    public static final String SESSION_OPTS = "oozie.service.JMSAccessorService.jms.sessionOpts";
    public static final String CONF_RETRY_INITIAL_DELAY = "oozie.service.JMSAccessorService.retry.initial.delay";
    public static final String CONF_RETRY_MULTIPLIER = "oozie.service.JMSAccessorService.retry.multiplier";
    public static final String CONF_RETRY_MAX_ATTEMPTS = "oozie.service.JMSAccessorService.retry.max.attempts";
    private static XLog LOG;
    private Configuration conf;
    private int sessionOpts;
    private int retryInitialDelay;
    private int retryMultiplier;
    private int retryMaxAttempts;
    private ConnectionContext jmsProducerConnContext;
    private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap = new ConcurrentHashMap();
    private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap = new ConcurrentHashMap();
    private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/oozie-core-4.3.0-mapr-1901.jar:org/apache/oozie/service/JMSAccessorService$ConnectionRetryInfo.class */
    public static class ConnectionRetryInfo {
        private int numAttempt;
        private int nextDelay;
        private Map<String, MessageHandler> retryTopicsMap = new HashMap();

        public ConnectionRetryInfo(int i, int i2) {
            this.numAttempt = i;
            this.nextDelay = i2;
        }

        public int getNumAttempt() {
            return this.numAttempt;
        }

        public void setNumAttempt(int i) {
            this.numAttempt = i;
        }

        public int getNextDelay() {
            return this.nextDelay;
        }

        public void setNextDelay(int i) {
            this.nextDelay = i;
        }

        public Map<String, MessageHandler> getTopicsToRetry() {
            return this.retryTopicsMap;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/oozie-core-4.3.0-mapr-1901.jar:org/apache/oozie/service/JMSAccessorService$JMSRetryRunnable.class */
    public class JMSRetryRunnable implements Runnable {
        private JMSConnectionInfo connInfo;

        public JMSRetryRunnable(JMSConnectionInfo jMSConnectionInfo) {
            this.connInfo = jMSConnectionInfo;
        }

        public JMSConnectionInfo getJMSConnectionInfo() {
            return this.connInfo;
        }

        @Override // java.lang.Runnable
        public void run() {
            JMSAccessorService.this.retryConnection(this.connInfo);
        }
    }

    @Override // org.apache.oozie.service.Service
    public void init(Services services) throws ServiceException {
        LOG = XLog.getLog(getClass());
        this.conf = services.getConf();
        this.sessionOpts = this.conf.getInt(SESSION_OPTS, 1);
        this.retryInitialDelay = this.conf.getInt(CONF_RETRY_INITIAL_DELAY, 60);
        this.retryMultiplier = this.conf.getInt(CONF_RETRY_MULTIPLIER, 2);
        this.retryMaxAttempts = this.conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10);
    }

    public void registerForNotification(JMSConnectionInfo jMSConnectionInfo, String str, MessageHandler messageHandler) {
        if (isTopicInRetryList(jMSConnectionInfo, str)) {
            return;
        }
        if (isConnectionInRetryList(jMSConnectionInfo)) {
            queueTopicForRetry(jMSConnectionInfo, str, messageHandler);
            return;
        }
        Map<String, MessageReceiver> receiversTopicsMap = getReceiversTopicsMap(jMSConnectionInfo);
        if (receiversTopicsMap.containsKey(str)) {
            return;
        }
        synchronized (receiversTopicsMap) {
            if (!receiversTopicsMap.containsKey(str)) {
                ConnectionContext createConnectionContext = createConnectionContext(jMSConnectionInfo);
                if (createConnectionContext == null) {
                    queueTopicForRetry(jMSConnectionInfo, str, messageHandler);
                    return;
                }
                MessageReceiver registerForTopic = registerForTopic(jMSConnectionInfo, createConnectionContext, str, messageHandler);
                if (registerForTopic == null) {
                    queueTopicForRetry(jMSConnectionInfo, str, messageHandler);
                } else {
                    LOG.info("Registered a listener for topic {0} on {1}", str, jMSConnectionInfo);
                    receiversTopicsMap.put(str, registerForTopic);
                }
            }
        }
    }

    public void unregisterFromNotification(JMSConnectionInfo jMSConnectionInfo, String str) {
        MessageReceiver remove;
        LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", jMSConnectionInfo, str);
        if (isTopicInRetryList(jMSConnectionInfo, str)) {
            removeTopicFromRetryList(jMSConnectionInfo, str);
            return;
        }
        Map<String, MessageReceiver> map = this.receiversMap.get(jMSConnectionInfo);
        if (map != null) {
            synchronized (map) {
                remove = map.remove(str);
                if (map.isEmpty()) {
                    this.receiversMap.remove(jMSConnectionInfo);
                }
            }
            if (remove == null) {
                LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.", str, jMSConnectionInfo);
                return;
            }
            try {
                remove.getSession().close();
            } catch (JMSException e) {
                LOG.warn("Unable to close session " + remove.getSession(), e);
            }
        }
    }

    private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo jMSConnectionInfo) {
        Map<String, MessageReceiver> map = this.receiversMap.get(jMSConnectionInfo);
        if (map == null) {
            map = new HashMap();
            Map<String, MessageReceiver> putIfAbsent = this.receiversMap.putIfAbsent(jMSConnectionInfo, map);
            if (putIfAbsent != null) {
                map = putIfAbsent;
            }
        }
        return map;
    }

    @VisibleForTesting
    boolean isListeningToTopic(JMSConnectionInfo jMSConnectionInfo, String str) {
        Map<String, MessageReceiver> map = this.receiversMap.get(jMSConnectionInfo);
        return map != null && map.containsKey(str);
    }

    @VisibleForTesting
    boolean isConnectionInRetryList(JMSConnectionInfo jMSConnectionInfo) {
        return this.retryConnectionsMap.containsKey(jMSConnectionInfo);
    }

    @VisibleForTesting
    boolean isTopicInRetryList(JMSConnectionInfo jMSConnectionInfo, String str) {
        ConnectionRetryInfo connectionRetryInfo = this.retryConnectionsMap.get(jMSConnectionInfo);
        if (connectionRetryInfo == null) {
            return false;
        }
        return connectionRetryInfo.getTopicsToRetry().containsKey(str);
    }

    @VisibleForTesting
    int getNumConnectionAttempts(JMSConnectionInfo jMSConnectionInfo) {
        return this.retryConnectionsMap.get(jMSConnectionInfo).getNumAttempt();
    }

    private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo jMSConnectionInfo) {
        ConnectionRetryInfo connectionRetryInfo = this.retryConnectionsMap.get(jMSConnectionInfo);
        if (connectionRetryInfo == null) {
            LOG.info("Queueing connection {0} for retry", jMSConnectionInfo);
            connectionRetryInfo = new ConnectionRetryInfo(0, this.retryInitialDelay);
            this.retryConnectionsMap.put(jMSConnectionInfo, connectionRetryInfo);
            scheduleRetry(jMSConnectionInfo, this.retryInitialDelay);
        }
        return connectionRetryInfo;
    }

    private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo jMSConnectionInfo, String str, MessageHandler messageHandler) {
        LOG.info("Queueing topic {0} for {1} for retry", str, jMSConnectionInfo);
        ConnectionRetryInfo queueConnectionForRetry = queueConnectionForRetry(jMSConnectionInfo);
        queueConnectionForRetry.getTopicsToRetry().put(str, messageHandler);
        return queueConnectionForRetry;
    }

    private void removeTopicFromRetryList(JMSConnectionInfo jMSConnectionInfo, String str) {
        LOG.info("Removing topic {0} from {1} from retry list", str, jMSConnectionInfo);
        ConnectionRetryInfo connectionRetryInfo = this.retryConnectionsMap.get(jMSConnectionInfo);
        if (connectionRetryInfo != null) {
            connectionRetryInfo.getTopicsToRetry().remove(str);
        }
    }

    private MessageReceiver registerForTopic(JMSConnectionInfo jMSConnectionInfo, ConnectionContext connectionContext, String str, MessageHandler messageHandler) {
        try {
            Session createSession = connectionContext.createSession(this.sessionOpts);
            MessageConsumer createConsumer = connectionContext.createConsumer(createSession, str);
            MessageReceiver messageReceiver = new MessageReceiver(messageHandler, createSession, createConsumer);
            createConsumer.setMessageListener(messageReceiver);
            return messageReceiver;
        } catch (JMSException e) {
            LOG.warn("Error while registering to listen to topic {0} from {1}", str, jMSConnectionInfo, e);
            return null;
        }
    }

    public ConnectionContext createConnectionContext(JMSConnectionInfo jMSConnectionInfo) {
        ConnectionContext connectionContext = this.connectionMap.get(jMSConnectionInfo);
        if (connectionContext == null) {
            try {
                connectionContext = getConnectionContextImpl();
                connectionContext.createConnection(jMSConnectionInfo.getJNDIProperties());
                connectionContext.setExceptionListener(new JMSExceptionListener(jMSConnectionInfo, connectionContext, true));
                this.connectionMap.put(jMSConnectionInfo, connectionContext);
                LOG.info("Connection established to JMS Server for [{0}]", jMSConnectionInfo);
            } catch (Exception e) {
                LOG.warn("Exception while establishing connection to JMS Server for [{0}]", jMSConnectionInfo, e);
                return null;
            }
        }
        return connectionContext;
    }

    public ConnectionContext createProducerConnectionContext(JMSConnectionInfo jMSConnectionInfo) {
        if (this.jmsProducerConnContext != null && this.jmsProducerConnContext.isConnectionInitialized()) {
            return this.jmsProducerConnContext;
        }
        synchronized (this) {
            if (this.jmsProducerConnContext == null || !this.jmsProducerConnContext.isConnectionInitialized()) {
                try {
                    this.jmsProducerConnContext = getConnectionContextImpl();
                    this.jmsProducerConnContext.createConnection(jMSConnectionInfo.getJNDIProperties());
                    this.jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(jMSConnectionInfo, this.jmsProducerConnContext, false));
                    LOG.info("Connection established to JMS Server for [{0}]", jMSConnectionInfo);
                } catch (Exception e) {
                    LOG.warn("Exception while establishing connection to JMS Server for [{0}]", jMSConnectionInfo, e);
                    return null;
                }
            }
        }
        return this.jmsProducerConnContext;
    }

    private ConnectionContext getConnectionContextImpl() {
        Class<?> cls = ConfigurationService.getClass(this.conf, JMS_CONNECTION_CONTEXT_IMPL);
        return cls == DefaultConnectionContext.class ? new DefaultConnectionContext() : (ConnectionContext) ReflectionUtils.newInstance(cls, (Configuration) null);
    }

    @VisibleForTesting
    MessageReceiver getMessageReceiver(JMSConnectionInfo jMSConnectionInfo, String str) {
        Map<String, MessageReceiver> map = this.receiversMap.get(jMSConnectionInfo);
        if (map != null) {
            return map.get(str);
        }
        return null;
    }

    @Override // org.apache.oozie.service.Service
    public void destroy() {
        LOG.info("Destroying JMSAccessor service ");
        this.receiversMap.clear();
        LOG.info("Closing JMS connections");
        Iterator<ConnectionContext> it = this.connectionMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.jmsProducerConnContext != null) {
            this.jmsProducerConnContext.close();
        }
        this.connectionMap.clear();
    }

    @Override // org.apache.oozie.service.Service
    public Class<? extends Service> getInterface() {
        return JMSAccessorService.class;
    }

    public void reestablishConnection(JMSConnectionInfo jMSConnectionInfo) {
        this.connectionMap.remove(jMSConnectionInfo);
        ConnectionRetryInfo queueConnectionForRetry = queueConnectionForRetry(jMSConnectionInfo);
        Map<String, MessageReceiver> remove = this.receiversMap.remove(jMSConnectionInfo);
        if (remove != null) {
            Map<String, MessageHandler> topicsToRetry = queueConnectionForRetry.getTopicsToRetry();
            for (Map.Entry<String, MessageReceiver> entry : remove.entrySet()) {
                topicsToRetry.put(entry.getKey(), entry.getValue().getMessageHandler());
            }
        }
    }

    private void scheduleRetry(JMSConnectionInfo jMSConnectionInfo, long j) {
        LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", jMSConnectionInfo, Long.valueOf(j));
        ((SchedulerService) Services.get().get(SchedulerService.class)).schedule(new JMSRetryRunnable(jMSConnectionInfo), j, SchedulerService.Unit.SEC);
    }

    @VisibleForTesting
    boolean retryConnection(JMSConnectionInfo jMSConnectionInfo) {
        ConnectionRetryInfo connectionRetryInfo = this.retryConnectionsMap.get(jMSConnectionInfo);
        if (connectionRetryInfo.getNumAttempt() >= this.retryMaxAttempts) {
            LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", jMSConnectionInfo, Integer.valueOf(this.retryMaxAttempts));
            return false;
        }
        LOG.info("Attempting retry of connection [{0}]", jMSConnectionInfo);
        connectionRetryInfo.setNumAttempt(connectionRetryInfo.getNumAttempt() + 1);
        connectionRetryInfo.setNextDelay(connectionRetryInfo.getNextDelay() * this.retryMultiplier);
        ConnectionContext createConnectionContext = createConnectionContext(jMSConnectionInfo);
        boolean z = false;
        if (createConnectionContext == null) {
            z = true;
        } else {
            Map<String, MessageHandler> topicsToRetry = connectionRetryInfo.getTopicsToRetry();
            Map<String, MessageReceiver> receiversTopicsMap = getReceiversTopicsMap(jMSConnectionInfo);
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<String, MessageHandler> entry : topicsToRetry.entrySet()) {
                String key = entry.getKey();
                if (!receiversTopicsMap.containsKey(key)) {
                    synchronized (receiversTopicsMap) {
                        if (!receiversTopicsMap.containsKey(key)) {
                            MessageReceiver registerForTopic = registerForTopic(jMSConnectionInfo, createConnectionContext, key, entry.getValue());
                            if (registerForTopic == null) {
                                LOG.warn("Failed to register a listener for topic {0} on {1}", key, jMSConnectionInfo);
                            } else {
                                receiversTopicsMap.put(key, registerForTopic);
                                arrayList.add(key);
                                LOG.info("Registered a listener for topic {0} on {1}", key, jMSConnectionInfo);
                            }
                        }
                    }
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                topicsToRetry.remove((String) it.next());
            }
            if (topicsToRetry.isEmpty()) {
                z = false;
            }
        }
        if (z) {
            scheduleRetry(jMSConnectionInfo, connectionRetryInfo.getNextDelay());
            return true;
        }
        this.retryConnectionsMap.remove(jMSConnectionInfo);
        return true;
    }
}
