package org.apache.oozie.jms;

import java.util.Date;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.jms.JMSMessagingUtils;
import org.apache.oozie.client.event.message.CoordinatorActionMessage;
import org.apache.oozie.client.event.message.WorkflowJobMessage;
import org.apache.oozie.event.CoordinatorActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.service.JMSAccessorService;
import org.apache.oozie.service.JMSTopicService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.DateUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/oozie/jms/TestJMSJobEventListener.class */
public class TestJMSJobEventListener extends XTestCase {
    private Services services;
    private Configuration conf;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.oozie.test.XTestCase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.services = new Services();
        this.conf = this.services.getConf();
        this.conf.set("oozie.services.ext", JMSAccessorService.class.getName() + "," + JMSTopicService.class.getName());
        this.conf.set("oozie.jms.producer.connection.properties", "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false&broker.useJmx=false;connectionFactoryNames#ConnectionFactory");
        this.services.init();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.oozie.test.XTestCase
    @After
    public void tearDown() throws Exception {
        this.services.destroy();
        super.tearDown();
    }

    @Test
    public void testOnWorkflowJobStartedEvent() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.RUNNING, "user1", "wf-app-name1", parseDateUTC, (Date) null);
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent));
        jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
        TextMessage receive = createConsumer.receive(5000L);
        assertFalse(receive.getText().contains("endTime"));
        WorkflowJobMessage eventMessage = JMSMessagingUtils.getEventMessage(receive);
        assertEquals(WorkflowJob.Status.RUNNING, eventMessage.getStatus());
        assertEquals(parseDateUTC, eventMessage.getStartTime());
        assertEquals("wfId1", eventMessage.getId());
        assertEquals("caId1", eventMessage.getParentId());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        assertEquals(AppType.WORKFLOW_JOB, eventMessage.getAppType());
        assertEquals(JobEvent.EventStatus.STARTED, eventMessage.getEventStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals("wf-app-name1", eventMessage.getAppName());
        jMSJobEventListener.destroy();
    }

    @Test
    public void testOnWorkflowJobSuccessEvent() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date date = new Date();
        WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUCCEEDED, "user1", "wf-app-name1", parseDateUTC, date);
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent));
        jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
        WorkflowJobMessage eventMessage = JMSMessagingUtils.getEventMessage(createConsumer.receive(5000L));
        assertEquals(WorkflowJob.Status.SUCCEEDED, eventMessage.getStatus());
        assertEquals(parseDateUTC, eventMessage.getStartTime());
        assertEquals(date, eventMessage.getEndTime());
        assertEquals("wfId1", eventMessage.getId());
        assertEquals("caId1", eventMessage.getParentId());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        assertEquals(AppType.WORKFLOW_JOB, eventMessage.getAppType());
        assertEquals(JobEvent.EventStatus.SUCCESS, eventMessage.getEventStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals("wf-app-name1", eventMessage.getAppName());
        jMSJobEventListener.destroy();
    }

    @Test
    public void testOnWorkflowJobFailureEvent() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date date = new Date();
        WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1", "wf-app-name1", parseDateUTC, date);
        workflowJobEvent.setErrorCode("dummyErrorCode");
        workflowJobEvent.setErrorMessage("dummyErrorMessage");
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent));
        jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
        WorkflowJobMessage eventMessage = JMSMessagingUtils.getEventMessage(createConsumer.receive(5000L));
        assertEquals(WorkflowJob.Status.FAILED, eventMessage.getStatus());
        assertEquals(parseDateUTC, eventMessage.getStartTime());
        assertEquals(date, eventMessage.getEndTime());
        assertEquals("wfId1", eventMessage.getId());
        assertEquals("caId1", eventMessage.getParentId());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        assertEquals(AppType.WORKFLOW_JOB, eventMessage.getAppType());
        assertEquals(JobEvent.EventStatus.FAILURE, eventMessage.getEventStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals("wf-app-name1", eventMessage.getAppName());
        assertEquals("dummyErrorCode", eventMessage.getErrorCode());
        assertEquals("dummyErrorMessage", eventMessage.getErrorMessage());
        jMSJobEventListener.destroy();
    }

    @Test
    public void testOnWorkflowJobSuspendEvent() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUSPENDED, "user1", "wf-app-name1", parseDateUTC, (Date) null);
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent));
        jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
        TextMessage receive = createConsumer.receive(5000L);
        assertFalse(receive.getText().contains("endTime"));
        WorkflowJobMessage eventMessage = JMSMessagingUtils.getEventMessage(receive);
        assertEquals(WorkflowJob.Status.SUSPENDED, eventMessage.getStatus());
        assertEquals(parseDateUTC, eventMessage.getStartTime());
        assertEquals("wfId1", eventMessage.getId());
        assertEquals("caId1", eventMessage.getParentId());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        assertEquals(AppType.WORKFLOW_JOB, eventMessage.getAppType());
        assertEquals(JobEvent.EventStatus.SUSPEND, eventMessage.getEventStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals("wf-app-name1", eventMessage.getAppName());
        assertNull(eventMessage.getErrorCode());
        assertNull(eventMessage.getErrorMessage());
        jMSJobEventListener.destroy();
    }

    @Test
    public void testWorkflowJobSelectors() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user_1", "wf-app-name1", new Date(), new Date());
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent), "user='user_1'");
        jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
        WorkflowJobMessage eventMessage = JMSMessagingUtils.getEventMessage(createConsumer.receive(5000L));
        Assert.assertEquals(WorkflowJob.Status.FAILED, eventMessage.getStatus());
        assertEquals("user_1", eventMessage.getUser());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        jMSJobEventListener.destroy();
    }

    @Test
    public void testWorkflowJobSelectorsNegative() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1", "wf-app-name1", new Date(), new Date());
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent), "user='Non_matching_user'");
        jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
        assertNull(createConsumer.receive(5000L));
        jMSJobEventListener.destroy();
    }

    @Test
    public void testWorkflowJobSelectorsOr() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1", "wf-app-name1", new Date(), new Date());
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent), "user='Non_matching_user' OR appName='wf-app-name1'");
        jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
        WorkflowJobMessage eventMessage = JMSMessagingUtils.getEventMessage(createConsumer.receive(5000L));
        Assert.assertEquals(WorkflowJob.Status.FAILED, eventMessage.getStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        jMSJobEventListener.destroy();
    }

    @Test
    public void testWorkflowJobSelectorsAnd() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1", "wf-app-name1", new Date(), new Date());
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent), "eventStatus='FAILURE' AND appType='WORKFLOW_JOB' AND msgType='JOB'");
        jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
        WorkflowJobMessage eventMessage = JMSMessagingUtils.getEventMessage(createConsumer.receive(5000L));
        Assert.assertEquals(WorkflowJob.Status.FAILED, eventMessage.getStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        jMSJobEventListener.destroy();
    }

    @Test
    public void testConnectionDrop() throws Exception {
        BrokerService brokerService = null;
        try {
            this.services.destroy();
            this.services = new Services();
            Configuration conf = this.services.getConf();
            conf.set("oozie.services.ext", JMSAccessorService.class.getName() + "," + JMSTopicService.class.getName());
            this.services.init();
            BrokerService brokerService2 = new BrokerService();
            brokerService2.setDataDirectory(getTestCaseDir());
            brokerService2.addConnector("tcp://localhost:0");
            assertFalse("There must be at least one transport connector initialised.", brokerService2.getTransportConnectors().isEmpty());
            String uri = ((TransportConnector) brokerService2.getTransportConnectors().get(0)).getConnectUri().toString();
            conf.set("oozie.jms.producer.connection.properties", "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" + uri + ";connectionFactoryNames#ConnectionFactory");
            JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
            jMSJobEventListener.init(conf);
            brokerService2.setUseJmx(false);
            brokerService2.start();
            assertNotNull(getConnectionContext());
            brokerService2.stop();
            assertNull(getConnectionContext());
            brokerService = new BrokerService();
            brokerService.setDataDirectory(getTestCaseDir());
            brokerService.addConnector(uri);
            brokerService.setUseJmx(false);
            brokerService.start();
            WorkflowJobEvent workflowJobEvent = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1", "wf-app-name1", new Date(), new Date());
            ConnectionContext connectionContext = getConnectionContext();
            MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(workflowJobEvent));
            jMSJobEventListener.onWorkflowJobEvent(workflowJobEvent);
            assertNotNull(createConsumer.receive(5000L));
            brokerService.stop();
            jMSJobEventListener.destroy();
            if (brokerService != null) {
                brokerService.stop();
            }
        } catch (Throwable th) {
            if (brokerService != null) {
                brokerService.stop();
            }
            throw th;
        }
    }

    @Test
    public void testOnCoordinatorActionWaitingEvent() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date parseDateUTC2 = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        CoordinatorActionEvent coordinatorActionEvent = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.WAITING, "user1", "wf-app-name1", parseDateUTC2, parseDateUTC, "missingDep1");
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(coordinatorActionEvent));
        jMSJobEventListener.onCoordinatorActionEvent(coordinatorActionEvent);
        TextMessage receive = createConsumer.receive(5000L);
        assertFalse(receive.getText().contains("endTime"));
        assertFalse(receive.getText().contains("errorCode"));
        assertFalse(receive.getText().contains("errorMessage"));
        CoordinatorActionMessage eventMessage = JMSMessagingUtils.getEventMessage(receive);
        assertEquals(CoordinatorAction.Status.WAITING, eventMessage.getStatus());
        assertEquals(parseDateUTC, eventMessage.getStartTime());
        assertEquals(parseDateUTC2, eventMessage.getNominalTime());
        assertEquals("caJobId1", eventMessage.getParentId());
        assertEquals("caId1", eventMessage.getId());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        assertEquals(AppType.COORDINATOR_ACTION, eventMessage.getAppType());
        assertEquals(JobEvent.EventStatus.WAITING, eventMessage.getEventStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals("wf-app-name1", eventMessage.getAppName());
        assertEquals("missingDep1", eventMessage.getMissingDependency());
    }

    @Test
    public void testOnCoordinatorActionStartEvent() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        CoordinatorActionEvent coordinatorActionEvent = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.RUNNING, "user1", "wf-app-name1", DateUtils.parseDateUTC("2011-07-11T00:00Z"), parseDateUTC, (String) null);
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(coordinatorActionEvent));
        jMSJobEventListener.onCoordinatorActionEvent(coordinatorActionEvent);
        TextMessage receive = createConsumer.receive(5000L);
        assertFalse(receive.getText().contains("endTime"));
        assertFalse(receive.getText().contains("errorCode"));
        assertFalse(receive.getText().contains("errorMessage"));
        assertFalse(receive.getText().contains("missingDependency"));
        CoordinatorActionMessage eventMessage = JMSMessagingUtils.getEventMessage(receive);
        assertEquals(CoordinatorAction.Status.RUNNING, eventMessage.getStatus());
        assertEquals(parseDateUTC, eventMessage.getStartTime());
        assertEquals("caJobId1", eventMessage.getParentId());
        assertEquals("caId1", eventMessage.getId());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        assertEquals(AppType.COORDINATOR_ACTION, eventMessage.getAppType());
        assertEquals(JobEvent.EventStatus.STARTED, eventMessage.getEventStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals("wf-app-name1", eventMessage.getAppName());
    }

    @Test
    public void testOnCoordinatorJobSuccessEvent() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date parseDateUTC2 = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        Date date = new Date();
        CoordinatorActionEvent coordinatorActionEvent = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.SUCCEEDED, "user1", "wf-app-name1", parseDateUTC2, parseDateUTC, (String) null);
        coordinatorActionEvent.setEndTime(date);
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(coordinatorActionEvent));
        jMSJobEventListener.onCoordinatorActionEvent(coordinatorActionEvent);
        TextMessage receive = createConsumer.receive(5000L);
        assertFalse(receive.getText().contains("errorCode"));
        assertFalse(receive.getText().contains("errorMessage"));
        assertFalse(receive.getText().contains("missingDependency"));
        CoordinatorActionMessage eventMessage = JMSMessagingUtils.getEventMessage(receive);
        assertEquals(CoordinatorAction.Status.SUCCEEDED, eventMessage.getStatus());
        assertEquals(parseDateUTC, eventMessage.getStartTime());
        assertEquals(date, eventMessage.getEndTime());
        assertEquals("caJobId1", eventMessage.getParentId());
        assertEquals("caId1", eventMessage.getId());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        assertEquals(AppType.COORDINATOR_ACTION, eventMessage.getAppType());
        assertEquals(JobEvent.EventStatus.SUCCESS, eventMessage.getEventStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals("wf-app-name1", eventMessage.getAppName());
    }

    @Test
    public void testOnCoordinatorJobFailureEvent() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date parseDateUTC2 = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        Date date = new Date();
        CoordinatorActionEvent coordinatorActionEvent = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED, "user1", "wf-app-name1", parseDateUTC2, parseDateUTC, (String) null);
        coordinatorActionEvent.setEndTime(date);
        coordinatorActionEvent.setErrorCode("E0101");
        coordinatorActionEvent.setErrorMessage("dummyError");
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(coordinatorActionEvent));
        jMSJobEventListener.onCoordinatorActionEvent(coordinatorActionEvent);
        TextMessage receive = createConsumer.receive(5000L);
        assertFalse(receive.getText().contains("missingDependency"));
        CoordinatorActionMessage eventMessage = JMSMessagingUtils.getEventMessage(receive);
        assertEquals(CoordinatorAction.Status.FAILED, eventMessage.getStatus());
        assertEquals(parseDateUTC, eventMessage.getStartTime());
        assertEquals(date, eventMessage.getEndTime());
        assertEquals("caJobId1", eventMessage.getParentId());
        assertEquals("caId1", eventMessage.getId());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
        assertEquals(AppType.COORDINATOR_ACTION, eventMessage.getAppType());
        assertEquals(JobEvent.EventStatus.FAILURE, eventMessage.getEventStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals("wf-app-name1", eventMessage.getAppName());
        assertEquals("E0101", eventMessage.getErrorCode());
        assertEquals("dummyError", eventMessage.getErrorMessage());
    }

    @Test
    public void testCoordinatorActionSelectors() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        CoordinatorActionEvent coordinatorActionEvent = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED, "user1", "wf-app-name1", DateUtils.parseDateUTC("2011-07-11T00:00Z"), parseDateUTC, (String) null);
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(coordinatorActionEvent), "user='user1'");
        jMSJobEventListener.onCoordinatorActionEvent(coordinatorActionEvent);
        CoordinatorActionMessage eventMessage = JMSMessagingUtils.getEventMessage(createConsumer.receive(5000L));
        Assert.assertEquals(CoordinatorAction.Status.FAILED, eventMessage.getStatus());
        assertEquals("user1", eventMessage.getUser());
        assertEquals(Event.MessageType.JOB, eventMessage.getMessageType());
    }

    @Test
    public void testCoordinatorActionSelectorsNegative() throws Exception {
        JMSJobEventListener jMSJobEventListener = new JMSJobEventListener();
        jMSJobEventListener.init(this.conf);
        Date parseDateUTC = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        CoordinatorActionEvent coordinatorActionEvent = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED, "user1", "wf-app-name1", DateUtils.parseDateUTC("2011-07-11T00:00Z"), parseDateUTC, (String) null);
        ConnectionContext connectionContext = getConnectionContext();
        MessageConsumer createConsumer = connectionContext.createConsumer(connectionContext.createSession(1), jMSJobEventListener.getTopic(coordinatorActionEvent), "user='Non_matching_user'");
        jMSJobEventListener.onCoordinatorActionEvent(coordinatorActionEvent);
        assertNull(createConsumer.receive(5000L));
    }

    private ConnectionContext getConnectionContext() {
        return Services.get().get(JMSAccessorService.class).createProducerConnectionContext(new JMSConnectionInfo(this.services.getConf().get("oozie.jms.producer.connection.properties")));
    }
}
