/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureQueue;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hive.org.apache.commons.logging.Log;
import org.apache.hive.org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={SmallTests.class})
public class TestMasterProcedureQueue {
    private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class);
    private MasterProcedureQueue queue;
    private Configuration conf;

    @Before
    public void setUp() throws IOException {
        this.conf = HBaseConfiguration.create();
        this.queue = new MasterProcedureQueue(this.conf, new TableLockManager.NullTableLockManager());
    }

    @After
    public void tearDown() throws IOException {
        Assert.assertEquals((long)0L, (long)this.queue.size());
    }

    @Test
    public void testSimpleTableOpsQueues() throws Exception {
        int i;
        int NUM_TABLES = 10;
        int NUM_ITEMS = 10;
        int count = 0;
        for (i = 1; i <= 10; ++i) {
            TableName tableName = TableName.valueOf(String.format("test-%04d", i));
            for (int j = 1; j <= 10; ++j) {
                this.queue.addBack(new TestTableProcedure(i * 1000 + j, tableName, TableProcedureInterface.TableOperationType.EDIT));
                Assert.assertEquals((long)(++count), (long)this.queue.size());
            }
        }
        Assert.assertEquals((long)100L, (long)this.queue.size());
        for (int j = 1; j <= 10; ++j) {
            for (int i2 = 1; i2 <= 10; ++i2) {
                Long procId = this.queue.poll();
                Assert.assertEquals((long)(--count), (long)this.queue.size());
                Assert.assertEquals((long)(i2 * 1000 + j), (long)procId);
            }
        }
        Assert.assertEquals((long)0L, (long)this.queue.size());
        for (i = 1; i <= 10; ++i) {
            TableName tableName = TableName.valueOf(String.format("test-%04d", i));
            Assert.assertTrue((boolean)this.queue.markTableAsDeleted(tableName));
        }
    }

    @Test
    public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
        TableName tableName = TableName.valueOf("testtb");
        this.queue.addBack(new TestTableProcedure(1L, tableName, TableProcedureInterface.TableOperationType.EDIT));
        Assert.assertFalse((boolean)this.queue.markTableAsDeleted(tableName));
        Assert.assertEquals((long)1L, (long)this.queue.poll());
        Assert.assertTrue((boolean)this.queue.tryAcquireTableWrite(tableName, "write"));
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertFalse((boolean)this.queue.markTableAsDeleted(tableName));
        this.queue.releaseTableWrite(tableName);
        Assert.assertTrue((boolean)this.queue.markTableAsDeleted(tableName));
    }

    @Test
    public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
        int i;
        TableName tableName = TableName.valueOf("testtb");
        int nitems = 2;
        for (i = 1; i <= 2; ++i) {
            this.queue.addBack(new TestTableProcedure(i, tableName, TableProcedureInterface.TableOperationType.READ));
        }
        Assert.assertFalse((boolean)this.queue.markTableAsDeleted(tableName));
        for (i = 1; i <= 2; ++i) {
            Assert.assertEquals((long)i, (long)this.queue.poll());
            Assert.assertTrue((boolean)this.queue.tryAcquireTableRead(tableName, "read " + i));
            Assert.assertFalse((boolean)this.queue.markTableAsDeleted(tableName));
        }
        for (i = 1; i <= 2; ++i) {
            Assert.assertFalse((boolean)this.queue.markTableAsDeleted(tableName));
            this.queue.releaseTableRead(tableName);
        }
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertTrue((boolean)this.queue.markTableAsDeleted(tableName));
    }

    @Test
    public void testVerifyRwLocks() throws Exception {
        TableName tableName = TableName.valueOf("testtb");
        this.queue.addBack(new TestTableProcedure(1L, tableName, TableProcedureInterface.TableOperationType.EDIT));
        this.queue.addBack(new TestTableProcedure(2L, tableName, TableProcedureInterface.TableOperationType.READ));
        this.queue.addBack(new TestTableProcedure(3L, tableName, TableProcedureInterface.TableOperationType.EDIT));
        this.queue.addBack(new TestTableProcedure(4L, tableName, TableProcedureInterface.TableOperationType.READ));
        this.queue.addBack(new TestTableProcedure(5L, tableName, TableProcedureInterface.TableOperationType.READ));
        Long procId = this.queue.poll();
        Assert.assertEquals((long)1L, (long)procId);
        Assert.assertEquals((Object)true, (Object)this.queue.tryAcquireTableWrite(tableName, "write " + procId));
        Assert.assertEquals(null, (Object)this.queue.poll());
        this.queue.releaseTableWrite(tableName);
        procId = this.queue.poll();
        Assert.assertEquals((long)2L, (long)procId);
        Assert.assertEquals((Object)true, (Object)this.queue.tryAcquireTableRead(tableName, "read " + procId));
        procId = this.queue.poll();
        Assert.assertEquals((long)3L, (long)procId);
        Assert.assertEquals((Object)false, (Object)this.queue.tryAcquireTableWrite(tableName, "write " + procId));
        this.queue.releaseTableRead(tableName);
        Assert.assertEquals((Object)true, (Object)this.queue.tryAcquireTableWrite(tableName, "write " + procId));
        Assert.assertEquals(null, (Object)this.queue.poll());
        this.queue.releaseTableWrite(tableName);
        procId = this.queue.poll();
        Assert.assertEquals((long)4L, (long)procId);
        Assert.assertEquals((Object)true, (Object)this.queue.tryAcquireTableRead(tableName, "read " + procId));
        procId = this.queue.poll();
        Assert.assertEquals((long)5L, (long)procId);
        Assert.assertEquals((Object)true, (Object)this.queue.tryAcquireTableRead(tableName, "read " + procId));
        this.queue.releaseTableRead(tableName);
        this.queue.releaseTableRead(tableName);
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertTrue((String)"queue should be deleted", (boolean)this.queue.markTableAsDeleted(tableName));
    }

    @Test(timeout=90000L)
    public void testConcurrentWriteOps() throws Exception {
        int i;
        final TestTableProcSet procSet = new TestTableProcSet(this.queue);
        int NUM_ITEMS = 10;
        int NUM_TABLES = 4;
        final AtomicInteger opsCount = new AtomicInteger(0);
        for (int i2 = 0; i2 < 4; ++i2) {
            TableName tableName = TableName.valueOf(String.format("testtb-%04d", i2));
            for (int j = 1; j < 10; ++j) {
                procSet.addBack(new TestTableProcedure(i2 * 100 + j, tableName, TableProcedureInterface.TableOperationType.EDIT));
                opsCount.incrementAndGet();
            }
        }
        Assert.assertEquals((long)opsCount.get(), (long)this.queue.size());
        Thread[] threads = new Thread[8];
        final HashSet concurrentTables = new HashSet();
        final ArrayList failures = new ArrayList();
        final AtomicInteger concurrentCount = new AtomicInteger(0);
        for (i = 0; i < threads.length; ++i) {
            threads[i] = new Thread(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    while (opsCount.get() > 0) {
                        AbstractCollection abstractCollection;
                        try {
                            TableProcedureInterface proc = procSet.acquire();
                            if (proc == null) {
                                TestMasterProcedureQueue.this.queue.signalAll();
                                if (opsCount.get() <= 0) break;
                                continue;
                            }
                            abstractCollection = concurrentTables;
                            synchronized (abstractCollection) {
                                Assert.assertTrue((String)("unexpected concurrency on " + proc.getTableName()), (boolean)concurrentTables.add(proc.getTableName()));
                            }
                            Assert.assertTrue((opsCount.decrementAndGet() >= 0 ? 1 : 0) != 0);
                            try {
                                long procId = ((Procedure)((Object)proc)).getProcId();
                                TableName tableId = proc.getTableName();
                                int concurrent = concurrentCount.incrementAndGet();
                                Assert.assertTrue((String)("inc-concurrent=" + concurrent + " 1 <= concurrent <= " + 4), (concurrent >= 1 && concurrent <= 4 ? 1 : 0) != 0);
                                LOG.debug("[S] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent);
                                Thread.sleep(2000L);
                                concurrent = concurrentCount.decrementAndGet();
                                LOG.debug("[E] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent);
                                Assert.assertTrue((String)("dec-concurrent=" + concurrent), (concurrent < 4 ? 1 : 0) != 0);
                            }
                            finally {
                                abstractCollection = concurrentTables;
                                synchronized (abstractCollection) {
                                    Assert.assertTrue((boolean)concurrentTables.remove(proc.getTableName()));
                                }
                                procSet.release(proc);
                            }
                        }
                        catch (Throwable e) {
                            LOG.error("Failed " + e.getMessage(), e);
                            abstractCollection = failures;
                            synchronized (abstractCollection) {
                                failures.add(e.getMessage());
                            }
                        }
                        finally {
                            TestMasterProcedureQueue.this.queue.signalAll();
                        }
                    }
                }
            };
            threads[i].start();
        }
        for (i = 0; i < threads.length; ++i) {
            threads[i].join();
        }
        Assert.assertTrue((String)failures.toString(), (boolean)failures.isEmpty());
        Assert.assertEquals((long)0L, (long)opsCount.get());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        for (i = 1; i <= 4; ++i) {
            TableName table = TableName.valueOf(String.format("testtb-%04d", i));
            Assert.assertTrue((String)("queue should be deleted, table=" + table), (boolean)this.queue.markTableAsDeleted(table));
        }
    }

    public static class TestTableProcedure
    extends Procedure<Void>
    implements TableProcedureInterface {
        private final TableProcedureInterface.TableOperationType opType;
        private final TableName tableName;

        public TestTableProcedure() {
            throw new UnsupportedOperationException("recovery should not be triggered here");
        }

        public TestTableProcedure(long procId, TableName tableName, TableProcedureInterface.TableOperationType opType) {
            this.tableName = tableName;
            this.opType = opType;
            this.setProcId(procId);
        }

        @Override
        public TableName getTableName() {
            return this.tableName;
        }

        @Override
        public TableProcedureInterface.TableOperationType getTableOperationType() {
            return this.opType;
        }

        @Override
        protected Procedure[] execute(Void env) {
            return null;
        }

        @Override
        protected void rollback(Void env) {
            throw new UnsupportedOperationException();
        }

        @Override
        protected boolean abort(Void env) {
            throw new UnsupportedOperationException();
        }

        @Override
        protected void serializeStateData(OutputStream stream) throws IOException {
        }

        @Override
        protected void deserializeStateData(InputStream stream) throws IOException {
        }
    }

    public static class TestTableProcSet {
        private final MasterProcedureQueue queue;
        private Map<Long, TableProcedureInterface> procsMap = new ConcurrentHashMap<Long, TableProcedureInterface>();

        public TestTableProcSet(MasterProcedureQueue queue) {
            this.queue = queue;
        }

        public void addBack(TableProcedureInterface tableProc) {
            Procedure proc = (Procedure)((Object)tableProc);
            this.procsMap.put(proc.getProcId(), tableProc);
            this.queue.addBack(proc);
        }

        public void addFront(TableProcedureInterface tableProc) {
            Procedure proc = (Procedure)((Object)tableProc);
            this.procsMap.put(proc.getProcId(), tableProc);
            this.queue.addFront(proc);
        }

        public TableProcedureInterface acquire() {
            TableProcedureInterface proc = null;
            boolean avail = false;
            while (!avail) {
                Long procId = this.queue.poll();
                TableProcedureInterface tableProcedureInterface = proc = procId != null ? this.procsMap.remove(procId) : null;
                if (proc == null) break;
                switch (proc.getTableOperationType()) {
                    case CREATE: 
                    case DELETE: 
                    case EDIT: {
                        avail = this.queue.tryAcquireTableWrite(proc.getTableName(), "op=" + (Object)((Object)proc.getTableOperationType()));
                        break;
                    }
                    case READ: {
                        avail = this.queue.tryAcquireTableRead(proc.getTableName(), "op=" + (Object)((Object)proc.getTableOperationType()));
                    }
                }
                if (avail) continue;
                this.addFront(proc);
                LOG.debug("yield procId=" + procId);
            }
            return proc;
        }

        public void release(TableProcedureInterface proc) {
            switch (proc.getTableOperationType()) {
                case CREATE: 
                case DELETE: 
                case EDIT: {
                    this.queue.releaseTableWrite(proc.getTableName());
                    break;
                }
                case READ: {
                    this.queue.releaseTableRead(proc.getTableName());
                }
            }
        }
    }
}

