package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.class */
public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
    private Map<String, ReplicationPeerZKImpl> peerClusters;
    private final String tableCFsNodeName;
    private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);

    public ReplicationPeersZKImpl(ZooKeeperWatcher zooKeeperWatcher, Configuration configuration, Abortable abortable) {
        super(zooKeeperWatcher, configuration, abortable);
        this.tableCFsNodeName = configuration.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
        this.peerClusters = new ConcurrentHashMap();
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public void init() throws ReplicationException {
        try {
            if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
                ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
            }
            addExistingPeers();
        } catch (KeeperException e) {
            throw new ReplicationException("Could not initialize replication peers", e);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public void addPeer(String str, ReplicationPeerConfig replicationPeerConfig, String str2) throws ReplicationException {
        try {
            if (peerExists(str)) {
                throw new IllegalArgumentException("Cannot add a peer with id=" + str + " because that id already exists.");
            }
            if (str.contains(HelpFormatter.DEFAULT_OPT_PREFIX) || str.equals(ReplicationQueuesZKImpl.RS_LOCK_ZNODE)) {
                throw new IllegalArgumentException("Found invalid peer name:" + str);
            }
            ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
            ArrayList arrayList = new ArrayList();
            ZKUtil.ZKUtilOp createAndFailSilent = ZKUtil.ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, str), toByteArray(replicationPeerConfig));
            ZKUtil.ZKUtilOp createAndFailSilent2 = ZKUtil.ZKUtilOp.createAndFailSilent(getPeerStateNode(str), ENABLED_ZNODE_BYTES);
            ZKUtil.ZKUtilOp createAndFailSilent3 = ZKUtil.ZKUtilOp.createAndFailSilent(getTableCFsNode(str), Bytes.toBytes(str2 == null ? "" : str2));
            arrayList.add(createAndFailSilent);
            arrayList.add(createAndFailSilent2);
            arrayList.add(createAndFailSilent3);
            ZKUtil.multiOrSequential(this.zookeeper, arrayList, false);
        } catch (KeeperException e) {
            throw new ReplicationException("Could not add peer with id=" + str + ", peerConfif=>" + replicationPeerConfig, e);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public void removePeer(String str) throws ReplicationException {
        try {
            if (!peerExists(str)) {
                throw new IllegalArgumentException("Cannot remove peer with id=" + str + " because that id does not exist.");
            }
            ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, str));
        } catch (KeeperException e) {
            throw new ReplicationException("Could not remove peer with id=" + str, e);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public void enablePeer(String str) throws ReplicationException {
        changePeerState(str, ZooKeeperProtos.ReplicationState.State.ENABLED);
        LOG.info("peer " + str + " is enabled");
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public void disablePeer(String str) throws ReplicationException {
        changePeerState(str, ZooKeeperProtos.ReplicationState.State.DISABLED);
        LOG.info("peer " + str + " is disabled");
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public String getPeerTableCFsConfig(String str) throws ReplicationException {
        try {
            if (!peerExists(str)) {
                throw new IllegalArgumentException("peer " + str + " doesn't exist");
            }
            try {
                return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(str)));
            } catch (Exception e) {
                throw new ReplicationException(e);
            }
        } catch (KeeperException e2) {
            throw new ReplicationException("Unable to get tableCFs of the peer with id=" + str, e2);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public void setPeerTableCFsConfig(String str, String str2) throws ReplicationException {
        try {
            if (!peerExists(str)) {
                throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + str + " does not exist.");
            }
            String tableCFsNode = getTableCFsNode(str);
            byte[] bytes = Bytes.toBytes(str2);
            if (ZKUtil.checkExists(this.zookeeper, tableCFsNode) != -1) {
                ZKUtil.setData(this.zookeeper, tableCFsNode, bytes);
            } else {
                ZKUtil.createAndWatch(this.zookeeper, tableCFsNode, bytes);
            }
            LOG.info("Peer tableCFs with id= " + str + " is now " + str2);
        } catch (KeeperException e) {
            throw new ReplicationException("Unable to change tableCFs of the peer with id=" + str, e);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public Map<TableName, List<String>> getTableCFs(String str) throws IllegalArgumentException {
        ReplicationPeerZKImpl replicationPeerZKImpl = this.peerClusters.get(str);
        if (replicationPeerZKImpl == null) {
            throw new IllegalArgumentException("Peer with id= " + str + " is not connected");
        }
        return replicationPeerZKImpl.getTableCFs();
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public boolean getStatusOfPeer(String str) {
        ReplicationPeerZKImpl replicationPeerZKImpl = this.peerClusters.get(str);
        if (replicationPeerZKImpl == null) {
            throw new IllegalArgumentException("Peer with id= " + str + " is not connected");
        }
        return replicationPeerZKImpl.getPeerState() == ReplicationPeer.PeerState.ENABLED;
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public boolean getStatusOfPeerFromBackingStore(String str) throws ReplicationException {
        try {
            if (!peerExists(str)) {
                throw new IllegalArgumentException("peer " + str + " doesn't exist");
            }
            try {
                return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, getPeerStateNode(str)));
            } catch (DeserializationException e) {
                throw new ReplicationException(e);
            } catch (KeeperException e2) {
                throw new ReplicationException(e2);
            }
        } catch (InterruptedException e3) {
            throw new ReplicationException(e3);
        } catch (KeeperException e4) {
            throw new ReplicationException("Unable to get status of the peer with id=" + str + " from backing store", e4);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
        TreeMap treeMap = new TreeMap();
        try {
            for (String str : ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode)) {
                ReplicationPeerConfig replicationPeerConfig = getReplicationPeerConfig(str);
                if (replicationPeerConfig == null) {
                    LOG.warn("Failed to get replication peer configuration of clusterid=" + str + " znode content, continuing.");
                } else {
                    treeMap.put(str, replicationPeerConfig);
                }
            }
        } catch (ReplicationException e) {
            this.abortable.abort("Cannot get the list of peers ", e);
        } catch (KeeperException e2) {
            this.abortable.abort("Cannot get the list of peers ", e2);
        }
        return treeMap;
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public ReplicationPeer getPeer(String str) {
        return this.peerClusters.get(str);
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public Set<String> getPeerIds() {
        return this.peerClusters.keySet();
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public ReplicationPeerConfig getReplicationPeerConfig(String str) throws ReplicationException {
        String joinZNode = ZKUtil.joinZNode(this.peersZNode, str);
        try {
            byte[] data = ZKUtil.getData(this.zookeeper, joinZNode);
            if (data == null) {
                LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + str);
                return null;
            }
            try {
                return parsePeerFrom(data);
            } catch (DeserializationException e) {
                LOG.warn("Failed to parse cluster key from peerId=" + str + ", specifically the content from the following znode: " + joinZNode);
                return null;
            }
        } catch (InterruptedException e2) {
            LOG.warn("Could not get configuration for peer because the thread was interrupted. peerId=" + str);
            Thread.currentThread().interrupt();
            return null;
        } catch (KeeperException e3) {
            throw new ReplicationException("Error getting configuration for peer with id=" + str, e3);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String str) throws ReplicationException {
        ReplicationPeerConfig replicationPeerConfig = getReplicationPeerConfig(str);
        if (replicationPeerConfig == null) {
            return null;
        }
        Configuration configuration = new Configuration(this.conf);
        try {
            if (replicationPeerConfig.getClusterKey() != null && !replicationPeerConfig.getClusterKey().isEmpty()) {
                ZKUtil.applyClusterKeyToConf(configuration, replicationPeerConfig.getClusterKey());
            }
            if (replicationPeerConfig.getConfiguration().isEmpty()) {
                return new Pair<>(replicationPeerConfig, configuration);
            }
            CompoundConfiguration compoundConfiguration = new CompoundConfiguration();
            compoundConfiguration.add(configuration);
            compoundConfiguration.addStringMap(replicationPeerConfig.getConfiguration());
            return new Pair<>(replicationPeerConfig, compoundConfiguration);
        } catch (IOException e) {
            LOG.error("Can't get peer configuration for peerId=" + str + " because:", e);
            return null;
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public List<String> getAllPeerIds() {
        List<String> list = null;
        try {
            list = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
        } catch (KeeperException e) {
            this.abortable.abort("Cannot get the list of peers ", e);
        }
        return list;
    }

    private void addExistingPeers() throws ReplicationException {
        try {
            List<String> listChildrenNoWatch = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
            if (listChildrenNoWatch != null) {
                Iterator<String> it = listChildrenNoWatch.iterator();
                while (it.hasNext()) {
                    createAndAddPeer(it.next());
                }
            }
        } catch (KeeperException e) {
            throw new ReplicationException("Error getting the list of peer clusters.", e);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public boolean peerAdded(String str) throws ReplicationException {
        return createAndAddPeer(str);
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationPeers
    public void peerRemoved(String str) {
        ReplicationPeerZKImpl replicationPeerZKImpl = this.peerClusters.get(str);
        if (replicationPeerZKImpl != null) {
            ((ConcurrentMap) this.peerClusters).remove(str, replicationPeerZKImpl);
        }
    }

    public boolean createAndAddPeer(String str) throws ReplicationException {
        if (this.peerClusters == null || this.peerClusters.containsKey(str)) {
            return false;
        }
        try {
            ReplicationPeerZKImpl createPeer = createPeer(str);
            if (createPeer == null) {
                return false;
            }
            ReplicationPeerZKImpl replicationPeerZKImpl = (ReplicationPeerZKImpl) ((ConcurrentMap) this.peerClusters).putIfAbsent(str, createPeer);
            if (replicationPeerZKImpl == null) {
                LOG.info("Added new peer cluster=" + createPeer.getPeerConfig().getClusterKey());
                return true;
            }
            LOG.info("Peer already present, " + replicationPeerZKImpl.getPeerConfig().getClusterKey() + ", new cluster=" + createPeer.getPeerConfig().getClusterKey());
            return true;
        } catch (Exception e) {
            throw new ReplicationException("Error adding peer with id=" + str, e);
        }
    }

    private String getTableCFsNode(String str) {
        return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(str, this.tableCFsNodeName));
    }

    private String getPeerStateNode(String str) {
        return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(str, this.peerStateNodeName));
    }

    private void changePeerState(String str, ZooKeeperProtos.ReplicationState.State state) throws ReplicationException {
        try {
            if (!peerExists(str)) {
                throw new IllegalArgumentException("Cannot enable/disable peer because id=" + str + " does not exist.");
            }
            String peerStateNode = getPeerStateNode(str);
            byte[] bArr = state == ZooKeeperProtos.ReplicationState.State.ENABLED ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
            if (ZKUtil.checkExists(this.zookeeper, peerStateNode) != -1) {
                ZKUtil.setData(this.zookeeper, peerStateNode, bArr);
            } else {
                ZKUtil.createAndWatch(this.zookeeper, peerStateNode, bArr);
            }
            LOG.info("Peer with id= " + str + " is now " + state.name());
        } catch (KeeperException e) {
            throw new ReplicationException("Unable to change state of the peer with id=" + str, e);
        }
    }

    private ReplicationPeerZKImpl createPeer(String str) throws ReplicationException {
        Pair<ReplicationPeerConfig, Configuration> peerConf = getPeerConf(str);
        if (peerConf == null) {
            return null;
        }
        ReplicationPeerZKImpl replicationPeerZKImpl = new ReplicationPeerZKImpl(peerConf.getSecond(), str, peerConf.getFirst());
        try {
            replicationPeerZKImpl.startStateTracker(this.zookeeper, getPeerStateNode(str));
            try {
                replicationPeerZKImpl.startTableCFsTracker(this.zookeeper, getTableCFsNode(str));
                return replicationPeerZKImpl;
            } catch (KeeperException e) {
                throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + str, e);
            }
        } catch (KeeperException e2) {
            throw new ReplicationException("Error starting the peer state tracker for peerId=" + str, e2);
        }
    }

    private static ReplicationPeerConfig parsePeerFrom(byte[] bArr) throws DeserializationException {
        if (!ProtobufUtil.isPBMagicPrefix(bArr)) {
            return bArr.length > 0 ? new ReplicationPeerConfig().setClusterKey(Bytes.toString(bArr)) : new ReplicationPeerConfig().setClusterKey("");
        }
        int lengthOfPBMagic = ProtobufUtil.lengthOfPBMagic();
        ZooKeeperProtos.ReplicationPeer.Builder newBuilder = ZooKeeperProtos.ReplicationPeer.newBuilder();
        try {
            ProtobufUtil.mergeFrom(newBuilder, bArr, lengthOfPBMagic, bArr.length - lengthOfPBMagic);
            return convert(newBuilder.build());
        } catch (IOException e) {
            throw new DeserializationException(e);
        }
    }

    private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer replicationPeer) {
        ReplicationPeerConfig replicationPeerConfig = new ReplicationPeerConfig();
        if (replicationPeer.hasClusterkey()) {
            replicationPeerConfig.setClusterKey(replicationPeer.getClusterkey());
        }
        if (replicationPeer.hasReplicationEndpointImpl()) {
            replicationPeerConfig.setReplicationEndpointImpl(replicationPeer.getReplicationEndpointImpl());
        }
        for (HBaseProtos.BytesBytesPair bytesBytesPair : replicationPeer.getDataList()) {
            replicationPeerConfig.getPeerData().put(bytesBytesPair.getFirst().toByteArray(), bytesBytesPair.getSecond().toByteArray());
        }
        for (HBaseProtos.NameStringPair nameStringPair : replicationPeer.getConfigurationList()) {
            replicationPeerConfig.getConfiguration().put(nameStringPair.getName(), nameStringPair.getValue());
        }
        return replicationPeerConfig;
    }

    private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig replicationPeerConfig) {
        ZooKeeperProtos.ReplicationPeer.Builder newBuilder = ZooKeeperProtos.ReplicationPeer.newBuilder();
        if (replicationPeerConfig.getClusterKey() != null) {
            newBuilder.setClusterkey(replicationPeerConfig.getClusterKey());
        }
        if (replicationPeerConfig.getReplicationEndpointImpl() != null) {
            newBuilder.setReplicationEndpointImpl(replicationPeerConfig.getReplicationEndpointImpl());
        }
        for (Map.Entry<byte[], byte[]> entry : replicationPeerConfig.getPeerData().entrySet()) {
            newBuilder.addData(HBaseProtos.BytesBytesPair.newBuilder().setFirst(ByteString.copyFrom(entry.getKey())).setSecond(ByteString.copyFrom(entry.getValue())).build());
        }
        for (Map.Entry<String, String> entry2 : replicationPeerConfig.getConfiguration().entrySet()) {
            newBuilder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry2.getKey()).setValue(entry2.getValue()).build());
        }
        return newBuilder.build();
    }

    private static byte[] toByteArray(ReplicationPeerConfig replicationPeerConfig) {
        return ProtobufUtil.prependPBMagic(convert(replicationPeerConfig).toByteArray());
    }
}
