package org.apache.hcatalog.hbase.snapshot;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hcatalog.hbase.snapshot.ZKUtil;
import org.apache.hcatalog.hbase.snapshot.lock.LockListener;
import org.apache.hcatalog.hbase.snapshot.lock.WriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.class */
public class ZKBasedRevisionManager implements RevisionManager {
    private static final Logger LOG = LoggerFactory.getLogger(ZKBasedRevisionManager.class);
    private String zkHostList;
    private String baseDir;
    private ZKUtil zkUtil;
    private long writeTxnTimeout;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager$RMLockListener.class */
    public class RMLockListener implements LockListener {
        RMLockListener() {
        }

        @Override // org.apache.hcatalog.hbase.snapshot.lock.LockListener
        public void lockAcquired() {
        }

        @Override // org.apache.hcatalog.hbase.snapshot.lock.LockListener
        public void lockReleased() {
        }
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public void initialize(Configuration configuration) {
        Configuration configuration2 = new Configuration(configuration);
        if (configuration2.get(RMConstants.ZOOKEEPER_HOSTLIST) == null) {
            String str = configuration2.get("hbase.zookeeper.quorum");
            int i = configuration2.getInt("hbase.zookeeper.property.clientPort", 2181);
            String[] split = str.split(",");
            StringBuffer stringBuffer = new StringBuffer();
            for (String str2 : split) {
                stringBuffer.append(str2);
                stringBuffer.append(':');
                stringBuffer.append(i);
                stringBuffer.append(',');
            }
            stringBuffer.deleteCharAt(stringBuffer.length() - 1);
            configuration2.set(RMConstants.ZOOKEEPER_HOSTLIST, stringBuffer.toString());
        }
        this.zkHostList = configuration2.get(RMConstants.ZOOKEEPER_HOSTLIST);
        this.baseDir = configuration2.get(RMConstants.ZOOKEEPER_DATADIR);
        this.writeTxnTimeout = Long.parseLong(configuration2.get(RMConstants.WRITE_TRANSACTION_TIMEOUT));
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public void open() throws IOException {
        this.zkUtil = new ZKUtil(this.zkHostList, this.baseDir);
        this.zkUtil.createRootZNodes();
        LOG.info("Created root znodes for revision manager.");
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public void close() {
        this.zkUtil.closeZKConnection();
    }

    private void checkInputParams(String str, List<String> list) {
        if (str == null) {
            throw new IllegalArgumentException("The table name must be specified for reading.");
        }
        if (list == null || list.isEmpty()) {
            throw new IllegalArgumentException("At least one column family should be specified for reading.");
        }
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public void createTable(String str, List<String> list) throws IOException {
        this.zkUtil.createRootZNodes();
        this.zkUtil.setUpZnodesForTable(str, list);
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public void dropTable(String str) throws IOException {
        this.zkUtil.deleteZNodes(str);
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public Transaction beginWriteTransaction(String str, List<String> list, Long l) throws IOException {
        checkInputParams(str, list);
        this.zkUtil.setUpZnodesForTable(str, list);
        Transaction transaction = new Transaction(str, list, this.zkUtil.nextId(str), this.zkUtil.getTimeStamp());
        if (l.longValue() != -1) {
            transaction.setKeepAlive(l.longValue());
        } else {
            transaction.setKeepAlive(this.writeTxnTimeout);
        }
        refreshTransactionList(transaction.getTableName());
        WriteLock writeLock = new WriteLock(this.zkUtil.getSession(), prepareLockNode(str), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        writeLock.setLockListener(new RMLockListener());
        try {
            try {
                try {
                    if (!writeLock.lock()) {
                        throw new IOException("Unable to obtain lock while beginning transaction. " + transaction.toString());
                    }
                    List<String> columnFamilies = transaction.getColumnFamilies();
                    FamilyRevision familyRevisionInfo = transaction.getFamilyRevisionInfo();
                    Iterator<String> it = columnFamilies.iterator();
                    while (it.hasNext()) {
                        this.zkUtil.updateData(PathUtil.getRunningTxnInfoPath(this.baseDir, str, it.next()), familyRevisionInfo, ZKUtil.UpdateMode.APPEND);
                    }
                    return transaction;
                } catch (InterruptedException e) {
                    throw new IOException("Exception while obtaining lock.", e);
                }
            } catch (KeeperException e2) {
                throw new IOException("Exception while obtaining lock.", e2);
            }
        } finally {
            writeLock.unlock();
        }
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public Transaction beginWriteTransaction(String str, List<String> list) throws IOException {
        return beginWriteTransaction(str, list, -1L);
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public void commitWriteTransaction(Transaction transaction) throws IOException {
        refreshTransactionList(transaction.getTableName());
        WriteLock writeLock = new WriteLock(this.zkUtil.getSession(), prepareLockNode(transaction.getTableName()), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        writeLock.setLockListener(new RMLockListener());
        try {
            try {
                if (!writeLock.lock()) {
                    throw new IOException("Unable to obtain lock while commiting transaction. " + transaction.toString());
                }
                String tableName = transaction.getTableName();
                List<String> columnFamilies = transaction.getColumnFamilies();
                FamilyRevision familyRevisionInfo = transaction.getFamilyRevisionInfo();
                Iterator<String> it = columnFamilies.iterator();
                while (it.hasNext()) {
                    this.zkUtil.updateData(PathUtil.getRunningTxnInfoPath(this.baseDir, tableName, it.next()), familyRevisionInfo, ZKUtil.UpdateMode.REMOVE);
                }
                LOG.info("Write Transaction committed: " + transaction.toString());
            } catch (InterruptedException e) {
                throw new IOException("Exception while obtaining lock.", e);
            } catch (KeeperException e2) {
                throw new IOException("Exception while obtaining lock.", e2);
            }
        } finally {
            writeLock.unlock();
        }
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public void abortWriteTransaction(Transaction transaction) throws IOException {
        refreshTransactionList(transaction.getTableName());
        WriteLock writeLock = new WriteLock(this.zkUtil.getSession(), prepareLockNode(transaction.getTableName()), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        writeLock.setLockListener(new RMLockListener());
        try {
            try {
                if (!writeLock.lock()) {
                    throw new IOException("Unable to obtain lock while aborting transaction. " + transaction.toString());
                }
                String tableName = transaction.getTableName();
                List<String> columnFamilies = transaction.getColumnFamilies();
                FamilyRevision familyRevisionInfo = transaction.getFamilyRevisionInfo();
                for (String str : columnFamilies) {
                    this.zkUtil.updateData(PathUtil.getRunningTxnInfoPath(this.baseDir, tableName, str), familyRevisionInfo, ZKUtil.UpdateMode.REMOVE);
                    this.zkUtil.updateData(PathUtil.getAbortInformationPath(this.baseDir, tableName, str), familyRevisionInfo, ZKUtil.UpdateMode.APPEND);
                }
                LOG.info("Write Transaction aborted: " + transaction.toString());
            } catch (KeeperException e) {
                throw new IOException("Exception while obtaining lock.", e);
            } catch (InterruptedException e2) {
                throw new IOException("Exception while obtaining lock.", e2);
            }
        } finally {
            writeLock.unlock();
        }
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public void keepAlive(Transaction transaction) throws IOException {
        refreshTransactionList(transaction.getTableName());
        transaction.keepAliveTransaction();
        WriteLock writeLock = new WriteLock(this.zkUtil.getSession(), prepareLockNode(transaction.getTableName()), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        writeLock.setLockListener(new RMLockListener());
        try {
            try {
                if (!writeLock.lock()) {
                    throw new IOException("Unable to obtain lock for keep alive of transaction. " + transaction.toString());
                }
                String tableName = transaction.getTableName();
                List<String> columnFamilies = transaction.getColumnFamilies();
                FamilyRevision familyRevisionInfo = transaction.getFamilyRevisionInfo();
                Iterator<String> it = columnFamilies.iterator();
                while (it.hasNext()) {
                    this.zkUtil.updateData(PathUtil.getRunningTxnInfoPath(this.baseDir, tableName, it.next()), familyRevisionInfo, ZKUtil.UpdateMode.KEEP_ALIVE);
                }
            } catch (InterruptedException e) {
                throw new IOException("Exception while obtaining lock.", e);
            } catch (KeeperException e2) {
                throw new IOException("Exception while obtaining lock.", e2);
            }
        } finally {
            writeLock.unlock();
        }
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public TableSnapshot createSnapshot(String str) throws IOException {
        long j;
        refreshTransactionList(str);
        long currentID = this.zkUtil.currentID(str);
        HashMap hashMap = new HashMap();
        for (String str2 : this.zkUtil.getColumnFamiliesOfTable(str)) {
            List<FamilyRevision> transactionList = this.zkUtil.getTransactionList(PathUtil.getRunningTxnInfoPath(this.baseDir, str, str2));
            if (transactionList.isEmpty()) {
                j = currentID;
            } else {
                Collections.sort(transactionList);
                j = transactionList.get(0).getRevision() - 1;
            }
            hashMap.put(str2, Long.valueOf(j));
        }
        TableSnapshot tableSnapshot = new TableSnapshot(str, hashMap, currentID);
        LOG.debug("Created snapshot For table: " + str + " snapshot: " + tableSnapshot);
        return tableSnapshot;
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public TableSnapshot createSnapshot(String str, Long l) throws IOException {
        if (l.longValue() > this.zkUtil.currentID(str)) {
            throw new IOException("The revision specified in the snapshot is higher than the current revision of the table.");
        }
        refreshTransactionList(str);
        HashMap hashMap = new HashMap();
        Iterator<String> it = this.zkUtil.getColumnFamiliesOfTable(str).iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), l);
        }
        return new TableSnapshot(str, hashMap, l.longValue());
    }

    List<FamilyRevision> getRunningTransactions(String str, String str2) throws IOException {
        return this.zkUtil.getTransactionList(PathUtil.getRunningTxnInfoPath(this.baseDir, str, str2));
    }

    @Override // org.apache.hcatalog.hbase.snapshot.RevisionManager
    public List<FamilyRevision> getAbortedWriteTransactions(String str, String str2) throws IOException {
        return this.zkUtil.getTransactionList(PathUtil.getAbortInformationPath(this.baseDir, str, str2));
    }

    private void refreshTransactionList(String str) throws IOException {
        WriteLock writeLock = new WriteLock(this.zkUtil.getSession(), prepareLockNode(str), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        writeLock.setLockListener(new RMLockListener());
        try {
            try {
                if (!writeLock.lock()) {
                    throw new IOException("Unable to obtain lock while refreshing transactions of table " + str + ".");
                }
                Iterator<String> it = this.zkUtil.getColumnFamiliesOfTable(str).iterator();
                while (it.hasNext()) {
                    this.zkUtil.refreshTransactions(PathUtil.getRunningTxnInfoPath(this.baseDir, str, it.next()));
                }
            } catch (KeeperException e) {
                throw new IOException("Exception while obtaining lock.", e);
            } catch (InterruptedException e2) {
                throw new IOException("Exception while obtaining lock.", e2);
            }
        } finally {
            writeLock.unlock();
        }
    }

    private String prepareLockNode(String str) throws IOException {
        String lockManagementNode = PathUtil.getLockManagementNode(PathUtil.getTxnDataPath(this.baseDir, str));
        this.zkUtil.ensurePathExists(lockManagementNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        return lockManagementNode;
    }
}
