package org.apache.oozie.service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.XCommand;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XCallable;

/* loaded from: input_file:org/apache/oozie/service/TestCallableQueueService.class */
public class TestCallableQueueService extends XTestCase {
    static AtomicLong EXEC_ORDER = new AtomicLong();

    /* loaded from: input_file:org/apache/oozie/service/TestCallableQueueService$CLCallable.class */
    public static class CLCallable implements XCallable<Void> {
        private static AtomicInteger counter = new AtomicInteger();
        private static int max = 0;

        public String getName() {
            return "name";
        }

        public int getPriority() {
            return 0;
        }

        public String getType() {
            return "type";
        }

        public String getKey() {
            return "name_" + UUID.randomUUID();
        }

        public String getEntityKey() {
            return null;
        }

        public long getCreatedTime() {
            return 0L;
        }

        public void setInterruptMode(boolean z) {
        }

        public boolean inInterruptMode() {
            return false;
        }

        /* renamed from: call, reason: merged with bridge method [inline-methods] */
        public Void m23call() throws Exception {
            incr();
            Thread.sleep(100L);
            decr();
            return null;
        }

        private void incr() {
            counter.incrementAndGet();
            max = Math.max(max, counter.intValue());
        }

        private void decr() {
            counter.decrementAndGet();
        }

        public static int getConcurrency() {
            return max;
        }

        public static void resetConcurrency() {
            max = 0;
        }
    }

    /* loaded from: input_file:org/apache/oozie/service/TestCallableQueueService$ExtendedXCommand.class */
    public static class ExtendedXCommand extends XCommand<Void> {
        private boolean lockRequired;
        public String lockKey;
        public long wait;
        long executed;

        public ExtendedXCommand(String str, String str2, int i, int i2, String str3, boolean z) {
            super(str, str2, i, false);
            this.lockRequired = true;
            this.lockRequired = z;
            this.lockKey = str3;
            this.wait = i2;
        }

        public ExtendedXCommand(String str, String str2, int i, int i2, String str3) {
            super(str, str2, i, false);
            this.lockRequired = true;
            this.lockKey = str3;
            this.wait = i2;
        }

        protected boolean isLockRequired() {
            return this.lockRequired;
        }

        protected boolean isReQueueRequired() {
            return false;
        }

        public String getEntityKey() {
            return this.lockKey;
        }

        protected void eagerLoadState() {
        }

        protected void eagerVerifyPrecondition() throws CommandException {
        }

        protected void loadState() {
        }

        protected void verifyPrecondition() throws CommandException {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: execute, reason: merged with bridge method [inline-methods] */
        public Void m24execute() throws CommandException {
            if (this.executed != 0) {
                return null;
            }
            try {
                Thread.sleep(this.wait);
                this.executed = System.currentTimeMillis();
                return null;
            } catch (InterruptedException e) {
                throw new CommandException(ErrorCode.ETEST, new Object[0]);
            }
        }
    }

    /* loaded from: input_file:org/apache/oozie/service/TestCallableQueueService$MyCallable.class */
    public static class MyCallable implements XCallable<Void> {
        String type;
        int priority;
        long executed;
        int wait;
        long order;
        long created;
        private String name;
        private String key;

        public MyCallable() {
            this(0, 0);
        }

        public String getName() {
            return this.name;
        }

        public String getType() {
            return this.type;
        }

        public MyCallable(String str, int i, int i2) {
            this.executed = 0L;
            this.created = System.currentTimeMillis();
            this.name = "myCallable";
            this.key = null;
            this.type = str;
            this.priority = i;
            this.wait = i2;
            this.key = this.name + "_" + UUID.randomUUID();
        }

        public MyCallable(String str, String str2, int i, int i2) {
            this.executed = 0L;
            this.created = System.currentTimeMillis();
            this.name = "myCallable";
            this.key = null;
            this.type = str2;
            this.priority = i;
            this.wait = i2;
            this.key = str;
        }

        public MyCallable(int i, int i2) {
            this("type", i, i2);
        }

        public int getPriority() {
            return this.priority;
        }

        public long getCreatedTime() {
            return this.created;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Type:").append(getType());
            sb.append(",Priority:").append(getPriority());
            return sb.toString();
        }

        /* renamed from: call, reason: merged with bridge method [inline-methods] */
        public Void m25call() throws Exception {
            this.order = TestCallableQueueService.EXEC_ORDER.getAndIncrement();
            Thread.sleep(this.wait);
            this.executed = System.currentTimeMillis();
            return null;
        }

        public String getKey() {
            return this.key;
        }

        public String getEntityKey() {
            return null;
        }

        public void setInterruptMode(boolean z) {
        }

        public boolean inInterruptMode() {
            return false;
        }
    }

    public void testQueuing() throws Exception {
        Services services = new Services();
        services.init();
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        final MyCallable myCallable = new MyCallable();
        callableQueueService.queue(myCallable);
        waitFor(1000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.1
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return myCallable.executed != 0;
            }
        });
        assertTrue(myCallable.executed != 0);
        services.destroy();
    }

    public void testDelayedQueuing() throws Exception {
        Services services = new Services();
        services.init();
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        final MyCallable myCallable = new MyCallable();
        long currentTimeMillis = System.currentTimeMillis();
        callableQueueService.queue(myCallable, 1000L);
        waitFor(3000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.2
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return myCallable.executed != 0;
            }
        });
        assertTrue(myCallable.executed >= currentTimeMillis + 1000);
        services.destroy();
    }

    public void testPriorityExecution() throws Exception {
        EXEC_ORDER = new AtomicLong();
        setSystemProperty("oozie.service.CallableQueueService.threads", "1");
        Services services = new Services();
        services.init();
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        final MyCallable myCallable = new MyCallable(0, 200);
        final MyCallable myCallable2 = new MyCallable(0, 200);
        final MyCallable myCallable3 = new MyCallable(0, 200);
        final MyCallable myCallable4 = new MyCallable();
        final MyCallable myCallable5 = new MyCallable(1, 10);
        callableQueueService.queue(myCallable);
        callableQueueService.queue(myCallable2);
        callableQueueService.queue(myCallable3);
        callableQueueService.queue(myCallable4);
        callableQueueService.queue(myCallable5);
        waitFor(3000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.3
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return (myCallable.executed == 0 || myCallable2.executed == 0 || myCallable3.executed == 0 || myCallable4.executed == 0 || myCallable5.executed == 0) ? false : true;
            }
        });
        assertTrue(myCallable.executed >= 0);
        assertTrue(myCallable2.executed >= 0);
        assertTrue(myCallable3.executed >= 0);
        assertTrue(myCallable4.executed >= 0);
        assertTrue(myCallable5.executed >= 0);
        assertTrue(myCallable5.order < myCallable4.order);
        services.destroy();
    }

    public void testQueueSerial() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable(0, 10);
        final MyCallable myCallable2 = new MyCallable(0, 10);
        final MyCallable myCallable3 = new MyCallable(0, 10);
        services.get(CallableQueueService.class).queueSerial(Arrays.asList(myCallable, myCallable2, myCallable3));
        waitFor(100, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.4
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return (myCallable.executed == 0 || myCallable2.executed == 0 || myCallable3.executed == 0) ? false : true;
            }
        });
        assertEquals(0L, myCallable.order);
        assertEquals(1L, myCallable2.order);
        assertEquals(2L, myCallable3.order);
        services.destroy();
    }

    public void testConcurrencyLimit() throws Exception {
        Services services = new Services();
        services.init();
        CLCallable.resetConcurrency();
        final CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        for (int i = 0; i < 10; i++) {
            callableQueueService.queue(new CLCallable(), 10L);
        }
        float f = XTestCase.WAITFOR_RATIO;
        try {
            XTestCase.WAITFOR_RATIO = 1.0f;
            waitFor(2000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.5
                @Override // org.apache.oozie.test.XTestCase.Predicate
                public boolean evaluate() throws Exception {
                    return callableQueueService.queueSize() == 0;
                }
            });
            XTestCase.WAITFOR_RATIO = f;
            System.out.println("Callable Queue Size :" + callableQueueService.queueSize());
            System.out.println("CLCallable Concurrency :" + CLCallable.getConcurrency());
            assertTrue(CLCallable.getConcurrency() <= 3);
            services.destroy();
        } catch (Throwable th) {
            XTestCase.WAITFOR_RATIO = f;
            throw th;
        }
    }

    public void testConcurrencyReachedAndChooseNextEligible() throws Exception {
        setSystemProperty("oozie.service.CallableQueueService.callable.next.eligible", "true");
        Services services = new Services();
        services.init();
        CLCallable.resetConcurrency();
        final CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        List<MyCallable> asList = Arrays.asList(new MyCallable(0, 100), new MyCallable(0, 100), new MyCallable(0, 100), new MyCallable(0, 100), new MyCallable(0, 100), new MyCallable(0, 100));
        MyCallable myCallable = new MyCallable("other", 0, 100);
        callableQueueService.queue(myCallable, 1000L);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            callableQueueService.queue((MyCallable) it.next(), 10L);
        }
        float f = XTestCase.WAITFOR_RATIO;
        try {
            XTestCase.WAITFOR_RATIO = 1.0f;
            waitFor(2000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.6
                @Override // org.apache.oozie.test.XTestCase.Predicate
                public boolean evaluate() throws Exception {
                    return callableQueueService.queueSize() == 0;
                }
            });
            XTestCase.WAITFOR_RATIO = f;
            System.out.println("Callable Queue Size :" + callableQueueService.queueSize());
            long j = Long.MIN_VALUE;
            for (MyCallable myCallable2 : asList) {
                System.out.println("Callable C executed :" + myCallable2.executed);
                assertTrue(myCallable2.executed != 0);
                j = Math.max(j, myCallable2.executed);
            }
            System.out.println("Callable callableOther executed :" + myCallable.executed);
            assertTrue(myCallable.executed < j);
            services.destroy();
        } catch (Throwable th) {
            XTestCase.WAITFOR_RATIO = f;
            throw th;
        }
    }

    public void testSerialConcurrencyLimit() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable("TestSerialConcurrencyLimit", 0, 100);
        final MyCallable myCallable2 = new MyCallable("TestSerialConcurrencyLimit", 0, 100);
        final MyCallable myCallable3 = new MyCallable("TestSerialConcurrencyLimit", 0, 100);
        final MyCallable myCallable4 = new MyCallable("TestSerialConcurrencyLimit", 0, 100);
        final MyCallable myCallable5 = new MyCallable("TestSerialConcurrencyLimit", 0, 100);
        List<MyCallable> asList = Arrays.asList(myCallable, myCallable2, myCallable3, myCallable4, myCallable5);
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        String str = "SerialConcurrencyLimit";
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            String str2 = str + "x";
            str = str2;
            callableQueueService.queueSerial(Arrays.asList((MyCallable) it.next(), new MyCallable(str2, 0, 0)));
        }
        waitFor(3000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.7
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return (myCallable.executed == 0 || myCallable2.executed == 0 || myCallable3.executed == 0 || myCallable4.executed == 0 || myCallable5.executed == 0) ? false : true;
            }
        });
        long j = Long.MAX_VALUE;
        for (MyCallable myCallable6 : asList) {
            assertTrue(myCallable6.executed != 0);
            j = Math.min(j, myCallable6.executed);
        }
        int i = 0;
        Iterator it2 = asList.iterator();
        while (it2.hasNext()) {
            if (((MyCallable) it2.next()).executed - j > 0) {
                i++;
            }
        }
        assertTrue(i >= 2);
        services.destroy();
    }

    public void testConcurrency() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable("TestConcurrency", 0, 100);
        final MyCallable myCallable2 = new MyCallable("TestConcurrency", 0, 100);
        final MyCallable myCallable3 = new MyCallable("TestConcurrency", 0, 100);
        final MyCallable myCallable4 = new MyCallable("TestConcurrency", 0, 100);
        final MyCallable myCallable5 = new MyCallable("TestConcurrency", 0, 100);
        List<MyCallable> asList = Arrays.asList(myCallable, myCallable2, myCallable3, myCallable4, myCallable5);
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            callableQueueService.queue((MyCallable) it.next());
        }
        waitFor(3000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.8
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return (myCallable.executed == 0 || myCallable2.executed == 0 || myCallable3.executed == 0 || myCallable4.executed == 0 || myCallable5.executed == 0) ? false : true;
            }
        });
        long j = Long.MAX_VALUE;
        for (MyCallable myCallable6 : asList) {
            assertTrue(myCallable6.executed != 0);
            j = Math.min(j, myCallable6.executed);
        }
        int i = 0;
        Iterator it2 = asList.iterator();
        while (it2.hasNext()) {
            if (((MyCallable) it2.next()).executed - j > 0) {
                i++;
            }
        }
        assertTrue(i >= 2);
        services.destroy();
    }

    public void testQueueUniquenessWithSameKey() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable("QueueUniquenessWithSameKey", "QueueUniquenessWithSameKey", 0, 100);
        final MyCallable myCallable2 = new MyCallable("QueueUniquenessWithSameKey", "QueueUniquenessWithSameKey", 0, 100);
        final MyCallable myCallable3 = new MyCallable("QueueUniquenessWithSameKey", "QueueUniquenessWithSameKey", 0, 100);
        List asList = Arrays.asList(myCallable, myCallable2, myCallable3);
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            callableQueueService.queue((MyCallable) it.next());
        }
        waitFor(200, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.9
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return myCallable.executed != 0 && myCallable2.executed == 0 && myCallable3.executed == 0;
            }
        });
        assertTrue(myCallable.executed != 0);
        assertTrue(myCallable2.executed == 0);
        assertTrue(myCallable3.executed == 0);
        services.destroy();
    }

    public void testQueueUniquenessWithSameKeyInComposite() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable("QueueUniquenessWithSameKeyInComposite", "QueueUniquenessWithSameKeyInComposite", 0, 200);
        final MyCallable myCallable2 = new MyCallable("QueueUniquenessWithSameKeyInComposite", "QueueUniquenessWithSameKeyInComposite", 0, 200);
        final MyCallable myCallable3 = new MyCallable("QueueUniquenessWithSameKeyInComposite", "QueueUniquenessWithSameKeyInComposite", 0, 200);
        List asList = Arrays.asList(myCallable, myCallable2, myCallable3);
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        String str = "QueueUniquenessWithSameKeyInComposite";
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            String str2 = str + "x";
            str = str2;
            callableQueueService.queueSerial(Arrays.asList((MyCallable) it.next(), new MyCallable(str2, 0, 0)), 200L);
        }
        waitFor(2000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.10
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return myCallable.executed != 0 && myCallable2.executed == 0 && myCallable3.executed == 0;
            }
        });
        assertTrue(myCallable.executed != 0);
        assertTrue(myCallable2.executed == 0);
        assertTrue(myCallable3.executed == 0);
        services.destroy();
    }

    public void testQueueUniquenessWithSameKeyInOneComposite() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable("QueueUniquenessWithSameKeyInOneComposite", "QueueUniquenessWithSameKeyInOneComposite", 0, 100);
        final MyCallable myCallable2 = new MyCallable("QueueUniquenessWithSameKeyInOneComposite", "QueueUniquenessWithSameKeyInOneComposite", 0, 100);
        final MyCallable myCallable3 = new MyCallable("QueueUniquenessWithSameKeyInOneComposite", "QueueUniquenessWithSameKeyInOneComposite", 0, 100);
        services.get(CallableQueueService.class).queueSerial(Arrays.asList(myCallable, myCallable2, myCallable3));
        waitFor(200, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.11
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return myCallable.executed != 0 && myCallable2.executed == 0 && myCallable3.executed == 0;
            }
        });
        assertTrue(myCallable.executed != 0);
        assertTrue(myCallable2.executed == 0);
        assertTrue(myCallable3.executed == 0);
        services.destroy();
    }

    public void testQueueUniquenessWithDiffKey() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable("QueueUniquenessWithDiffKey1", "QueueUniquenessWithDiffKey", 0, 100);
        final MyCallable myCallable2 = new MyCallable("QueueUniquenessWithDiffKey2", "QueueUniquenessWithDiffKey", 0, 100);
        final MyCallable myCallable3 = new MyCallable("QueueUniquenessWithDiffKey3", "QueueUniquenessWithDiffKey", 0, 100);
        List asList = Arrays.asList(myCallable, myCallable2, myCallable3);
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            callableQueueService.queue((MyCallable) it.next());
        }
        waitFor(200, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.12
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return (myCallable.executed == 0 || myCallable2.executed == 0 || myCallable3.executed == 0) ? false : true;
            }
        });
        assertTrue(myCallable.executed != 0);
        assertTrue(myCallable2.executed != 0);
        assertTrue(myCallable3.executed != 0);
        services.destroy();
    }

    public void testQueueUniquenessWithDiffKeyInComposite() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable("QueueUniquenessWithDiffKeyInComposite1", "QueueUniquenessWithDiffKeyInComposite", 0, 100);
        final MyCallable myCallable2 = new MyCallable("QueueUniquenessWithDiffKeyInComposite2", "QueueUniquenessWithDiffKeyInComposite", 0, 100);
        final MyCallable myCallable3 = new MyCallable("QueueUniquenessWithDiffKeyInComposite3", "QueueUniquenessWithDiffKeyInComposite", 0, 100);
        List asList = Arrays.asList(myCallable, myCallable2, myCallable3);
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        String str = "QueueUniquenessWithDiffKeyInComposite";
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            String str2 = str + "x";
            str = str2;
            callableQueueService.queueSerial(Arrays.asList((MyCallable) it.next(), new MyCallable(str2, 0, 0)));
        }
        waitFor(200, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.13
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return (myCallable.executed == 0 || myCallable2.executed == 0 || myCallable3.executed == 0) ? false : true;
            }
        });
        assertTrue(myCallable.executed != 0);
        assertTrue(myCallable2.executed != 0);
        assertTrue(myCallable3.executed != 0);
        services.destroy();
    }

    public void testQueueUniquenessWithDiffKeyInOneComposite() throws Exception {
        EXEC_ORDER = new AtomicLong();
        Services services = new Services();
        services.init();
        final MyCallable myCallable = new MyCallable("QueueUniquenessWithDiffKeyInOneComposite1", "QueueUniquenessWithDiffKeyInOneComposite", 0, 100);
        final MyCallable myCallable2 = new MyCallable("QueueUniquenessWithDiffKeyInOneComposite2", "QueueUniquenessWithDiffKeyInOneComposite", 0, 100);
        final MyCallable myCallable3 = new MyCallable("QueueUniquenessWithDiffKeyInOneComposite3", "QueueUniquenessWithDiffKeyInOneComposite", 0, 100);
        services.get(CallableQueueService.class).queueSerial(Arrays.asList(myCallable, myCallable2, myCallable3));
        waitFor(200, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.14
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return (myCallable.executed == 0 || myCallable2.executed == 0 || myCallable3.executed == 0) ? false : true;
            }
        });
        assertTrue(myCallable.executed != 0);
        assertTrue(myCallable2.executed != 0);
        assertTrue(myCallable3.executed != 0);
        services.destroy();
    }

    public void testInterrupt() throws Exception {
        EXEC_ORDER = new AtomicLong();
        setSystemProperty("oozie.service.CallableQueueService.threads", "1");
        setSystemProperty("oozie.service.CallableQueueService.InterruptTypes", "testKill");
        Services services = new Services();
        services.init();
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        final ExtendedXCommand extendedXCommand = new ExtendedXCommand("initialKey", "initialType", 2, 200, "initialLockKey");
        final ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
        }
        final ExtendedXCommand extendedXCommand2 = new ExtendedXCommand("keyInt", "testKill", 0, 200, "lockKey");
        callableQueueService.queue(extendedXCommand);
        for (int i2 = 0; i2 < 10; i2++) {
            callableQueueService.queue((XCallable) arrayList.get(i2));
        }
        callableQueueService.queue(extendedXCommand2);
        waitFor(3000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.15
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                boolean z = (extendedXCommand.executed == 0 || extendedXCommand2.executed == 0) ? false : true;
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    z = z && ((ExtendedXCommand) it.next()).executed != 0;
                }
                return z;
            }
        });
        assertTrue(extendedXCommand.executed > 0);
        assertTrue(extendedXCommand2.executed > 0);
        assertTrue(extendedXCommand2.executed < ((ExtendedXCommand) arrayList.get(5)).executed);
        services.destroy();
    }

    public void testInterruptsWithDistinguishedLockKeys() throws Exception {
        EXEC_ORDER = new AtomicLong();
        setSystemProperty("oozie.service.CallableQueueService.threads", "1");
        setSystemProperty("oozie.service.CallableQueueService.InterruptTypes", "testKill");
        Services services = new Services();
        services.init();
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        final ExtendedXCommand extendedXCommand = new ExtendedXCommand("initialKey", "initialType", 2, 200, "initialLockKey");
        final ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey" + i));
        }
        final ExtendedXCommand extendedXCommand2 = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey");
        callableQueueService.queue(extendedXCommand);
        for (int i2 = 0; i2 < 10; i2++) {
            callableQueueService.queue((XCallable) arrayList.get(i2));
        }
        callableQueueService.queue(extendedXCommand2);
        waitFor(6000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.16
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                boolean z = (extendedXCommand.executed == 0 || extendedXCommand2.executed == 0) ? false : true;
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    z = z && ((ExtendedXCommand) it.next()).executed != 0;
                }
                return z;
            }
        });
        assertTrue(extendedXCommand.executed > 0);
        assertTrue(extendedXCommand2.executed > 0);
        assertTrue(extendedXCommand2.executed > ((ExtendedXCommand) arrayList.get(5)).executed);
        services.destroy();
    }

    public void testInterruptsWithCompositeCallable() throws Exception {
        EXEC_ORDER = new AtomicLong();
        setSystemProperty("oozie.service.CallableQueueService.threads", "1");
        setSystemProperty("oozie.service.CallableQueueService.InterruptTypes", "testKill");
        Services services = new Services();
        services.init();
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        final ExtendedXCommand extendedXCommand = new ExtendedXCommand("initialKey", "initialType", 2, 200, "initialLockKey");
        final ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
        }
        final ExtendedXCommand extendedXCommand2 = new ExtendedXCommand("key5", "testKill", 0, 200, "lockKey");
        callableQueueService.queue(extendedXCommand);
        callableQueueService.queueSerial(arrayList, 0L);
        callableQueueService.queue(extendedXCommand2);
        waitFor(3000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.17
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                boolean z = (extendedXCommand.executed == 0 || extendedXCommand2.executed == 0) ? false : true;
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    z = z && ((ExtendedXCommand) it.next()).executed != 0;
                }
                return z;
            }
        });
        assertTrue(extendedXCommand.executed > 0);
        assertTrue(extendedXCommand2.executed > 0);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            assertTrue(extendedXCommand2.executed < ((ExtendedXCommand) it.next()).executed);
        }
        services.destroy();
    }

    public void testInterruptsInCompositeCallable() throws Exception {
        EXEC_ORDER = new AtomicLong();
        setSystemProperty("oozie.service.CallableQueueService.threads", "1");
        setSystemProperty("oozie.service.CallableQueueService.InterruptTypes", "testKill");
        Services services = new Services();
        services.init();
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        final ExtendedXCommand extendedXCommand = new ExtendedXCommand("initialKey", "initialType", 2, 200, "initialLockKey");
        final ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            arrayList.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
        }
        arrayList.add(new ExtendedXCommand("key5", "testKill", 1, 100, "lockKey"));
        for (int i2 = 6; i2 < 10; i2++) {
            arrayList.add(new ExtendedXCommand("key" + i2, "type" + i2, 1, 100, "lockKey"));
        }
        callableQueueService.queue(extendedXCommand);
        callableQueueService.queueSerial(arrayList, 0L);
        waitFor(3000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.18
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                boolean z = extendedXCommand.executed != 0;
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    z = z && ((ExtendedXCommand) it.next()).executed != 0;
                }
                return z;
            }
        });
        assertTrue(extendedXCommand.executed > 0);
        assertTrue(((ExtendedXCommand) arrayList.get(1)).executed > ((ExtendedXCommand) arrayList.get(5)).executed);
        services.destroy();
    }

    public void testMaxInterruptMapSize() throws Exception {
        EXEC_ORDER = new AtomicLong();
        setSystemProperty("oozie.service.CallableQueueService.threads", "1");
        setSystemProperty("oozie.service.CallableQueueService.InterruptTypes", "testKill");
        setSystemProperty("oozie.service.CallableQueueService.InterruptMapMaxSize", "0");
        Services services = new Services();
        services.init();
        CallableQueueService callableQueueService = services.get(CallableQueueService.class);
        final ExtendedXCommand extendedXCommand = new ExtendedXCommand("initialKey", "initialType", 2, 100, "initialLockKey");
        final ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
        }
        final ExtendedXCommand extendedXCommand2 = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey");
        callableQueueService.queue(extendedXCommand);
        for (int i2 = 0; i2 < 10; i2++) {
            callableQueueService.queue((XCallable) arrayList.get(i2));
        }
        callableQueueService.queue(extendedXCommand2);
        waitFor(5000, new XTestCase.Predicate() { // from class: org.apache.oozie.service.TestCallableQueueService.19
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                boolean z = (extendedXCommand.executed == 0 || extendedXCommand2.executed == 0) ? false : true;
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    z = z && ((ExtendedXCommand) it.next()).executed != 0;
                }
                return z;
            }
        });
        assertTrue(extendedXCommand.executed > 0);
        assertTrue(extendedXCommand2.executed > 0);
        assertTrue(extendedXCommand2.executed > ((ExtendedXCommand) arrayList.get(5)).executed);
        services.destroy();
    }
}
