package org.apache.hadoop.hdfs.qjournal.server;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.4.205-eep-911.jar:org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.class */
public class JournalNodeSyncer {
    public static final Logger LOG;
    private final JournalNode jn;
    private final Journal journal;
    private final String jid;
    private String nameServiceId;
    private final JNStorage jnStorage;
    private final Configuration conf;
    private volatile Daemon syncJournalDaemon;
    private int numOtherJNs;
    private final long journalSyncInterval;
    private final int logSegmentTransferTimeout;
    private final DataTransferThrottler throttler;
    private final JournalMetrics metrics;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean shouldSync = true;
    private List<JournalNodeProxy> otherJNProxies = Lists.newArrayList();
    private int journalNodeIndexForSync = 0;
    private boolean journalSyncerStarted = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.4.205-eep-911.jar:org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer$JournalNodeProxy.class */
    public class JournalNodeProxy {
        private final InetSocketAddress jnAddr;
        private final InterQJournalProtocol jnProxy;
        private URL httpServerUrl;

        JournalNodeProxy(final InetSocketAddress inetSocketAddress) throws IOException {
            final Configuration configuration = new Configuration(JournalNodeSyncer.this.conf);
            this.jnAddr = inetSocketAddress;
            this.jnProxy = (InterQJournalProtocol) SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<InterQJournalProtocol>() { // from class: org.apache.hadoop.hdfs.qjournal.server.JournalNodeSyncer.JournalNodeProxy.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public InterQJournalProtocol run() throws IOException {
                    RPC.setProtocolEngine(configuration, InterQJournalProtocolPB.class, ProtobufRpcEngine2.class);
                    return new InterQJournalProtocolTranslatorPB((InterQJournalProtocolPB) RPC.getProxy(InterQJournalProtocolPB.class, RPC.getProtocolVersion(InterQJournalProtocolPB.class), inetSocketAddress, configuration));
                }
            });
        }

        public String toString() {
            return this.jnAddr.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JournalNodeSyncer(JournalNode journalNode, Journal journal, String str, Configuration configuration, String str2) {
        this.jn = journalNode;
        this.journal = journal;
        this.jid = str;
        this.nameServiceId = str2;
        this.jnStorage = journal.getStorage();
        this.conf = configuration;
        this.journalSyncInterval = configuration.getLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 120000L);
        this.logSegmentTransferTimeout = configuration.getInt(DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY, 30000);
        this.throttler = getThrottler(configuration);
        this.metrics = journal.getMetrics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopSync() {
        this.shouldSync = false;
        File editsSyncDir = this.journal.getStorage().getEditsSyncDir();
        if (editsSyncDir.exists()) {
            FileUtil.fullyDelete(editsSyncDir);
        }
        if (this.syncJournalDaemon != null) {
            this.syncJournalDaemon.interrupt();
        }
    }

    public void start(String str) {
        if (str != null) {
            this.nameServiceId = str;
            this.journal.setTriedJournalSyncerStartedwithnsId(true);
        }
        if (this.journalSyncerStarted || !getOtherJournalNodeProxies()) {
            return;
        }
        LOG.info("Starting SyncJournal daemon for journal " + this.jid);
        startSyncJournalsDaemon();
        this.journalSyncerStarted = true;
    }

    public boolean isJournalSyncerStarted() {
        return this.journalSyncerStarted;
    }

    private boolean createEditsSyncDir() {
        File editsSyncDir = this.journal.getStorage().getEditsSyncDir();
        if (!editsSyncDir.exists()) {
            return editsSyncDir.mkdir();
        }
        LOG.info(editsSyncDir + " directory already exists.");
        return true;
    }

    private boolean getOtherJournalNodeProxies() {
        List<InetSocketAddress> otherJournalNodeAddrs = getOtherJournalNodeAddrs();
        if (otherJournalNodeAddrs == null || otherJournalNodeAddrs.isEmpty()) {
            LOG.warn("Other JournalNode addresses not available. Journal Syncing cannot be done");
            return false;
        }
        for (InetSocketAddress inetSocketAddress : otherJournalNodeAddrs) {
            try {
                this.otherJNProxies.add(new JournalNodeProxy(inetSocketAddress));
            } catch (IOException e) {
                LOG.warn("Could not add proxy for Journal at addresss " + inetSocketAddress, (Throwable) e);
            }
        }
        if (this.otherJNProxies.isEmpty()) {
            LOG.error("Cannot sync as there is no other JN available for sync.");
            return false;
        }
        this.numOtherJNs = this.otherJNProxies.size();
        return true;
    }

    private void startSyncJournalsDaemon() {
        this.syncJournalDaemon = new Daemon(() -> {
            while (!this.journal.isFormatted()) {
                try {
                    Thread.sleep(this.journalSyncInterval);
                } catch (InterruptedException e) {
                    LOG.error("JournalNodeSyncer daemon received Runtime exception.", (Throwable) e);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            if (!createEditsSyncDir()) {
                LOG.error("Failed to create directory for downloading log segments: {}. Stopping Journal Node Sync.", this.journal.getStorage().getEditsSyncDir());
                return;
            }
            while (this.shouldSync) {
                try {
                    if (this.journal.isFormatted()) {
                        syncJournals();
                    } else {
                        LOG.warn("Journal cannot sync. Not formatted.");
                    }
                } catch (Throwable th) {
                    if (!this.shouldSync) {
                        if (!(th instanceof InterruptedException)) {
                            LOG.warn("JournalNodeSyncer received an exception while shutting down.", th);
                            return;
                        } else {
                            LOG.info("Stopping JournalNode Sync.");
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    if (th instanceof InterruptedException) {
                        LOG.warn("JournalNodeSyncer interrupted", th);
                        Thread.currentThread().interrupt();
                        return;
                    }
                    LOG.error("JournalNodeSyncer daemon received Runtime exception. ", th);
                }
                try {
                    Thread.sleep(this.journalSyncInterval);
                } catch (InterruptedException e2) {
                    if (this.shouldSync) {
                        LOG.warn("JournalNodeSyncer interrupted", (Throwable) e2);
                    } else {
                        LOG.info("Stopping JournalNode Sync.");
                    }
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        });
        this.syncJournalDaemon.start();
    }

    private void syncJournals() {
        syncWithJournalAtIndex(this.journalNodeIndexForSync);
        this.journalNodeIndexForSync = (this.journalNodeIndexForSync + 1) % this.numOtherJNs;
    }

    private void syncWithJournalAtIndex(int i) {
        LOG.info("Syncing Journal " + this.jn.getBoundIpcAddress().getAddress() + ":" + this.jn.getBoundIpcAddress().getPort() + " with " + this.otherJNProxies.get(i) + ", journal id: " + this.jid);
        InterQJournalProtocol interQJournalProtocol = this.otherJNProxies.get(i).jnProxy;
        if (interQJournalProtocol == null) {
            LOG.error("JournalNode Proxy not found.");
            return;
        }
        try {
            try {
                getMissingLogSegments(this.journal.getEditLogManifest(0L, false).getLogs(), interQJournalProtocol.getEditLogManifestFromJournal(this.jid, this.nameServiceId, 0L, false), this.otherJNProxies.get(i));
            } catch (IOException e) {
                LOG.debug("Could not sync with Journal at {}.", this.otherJNProxies.get(this.journalNodeIndexForSync), e);
            }
        } catch (IOException e2) {
            LOG.error("Exception in getting local edit log manifest", (Throwable) e2);
        }
    }

    private List<InetSocketAddress> getOtherJournalNodeAddrs() {
        try {
            String trimmed = this.conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
            if ((trimmed == null || trimmed.isEmpty()) && this.nameServiceId != null) {
                trimmed = this.conf.getTrimmed("dfs.namenode.shared.edits.dir." + this.nameServiceId);
            }
            if (trimmed == null || trimmed.isEmpty()) {
                HashSet newHashSet = Sets.newHashSet();
                if (this.nameServiceId != null) {
                    Iterator<String> it = DFSUtilClient.getNameNodeIds(this.conf, this.nameServiceId).iterator();
                    while (it.hasNext()) {
                        trimmed = this.conf.getTrimmed("dfs.namenode.shared.edits.dir." + (this.nameServiceId + "." + it.next()));
                        newHashSet.add(trimmed);
                    }
                    if (newHashSet.size() > 1) {
                        trimmed = null;
                        LOG.error("The conf property dfs.namenode.shared.edits.dir not set properly, it has been configured with different journalnode values " + newHashSet.toString() + " for a single nameserviceId" + this.nameServiceId);
                    }
                }
            }
            if (trimmed != null && !trimmed.isEmpty()) {
                return getJournalAddrList(trimmed);
            }
            LOG.error("Could not construct Shared Edits Uri");
            return null;
        } catch (IOException e) {
            LOG.error("Could not parse JournalNode addresses: ");
            return null;
        } catch (URISyntaxException e2) {
            LOG.error("The conf property dfs.namenode.shared.edits.dir not set properly.");
            return null;
        }
    }

    private List<InetSocketAddress> getJournalAddrList(String str) throws URISyntaxException, IOException {
        return Util.getLoggerAddresses(new URI(str), Sets.newHashSet(this.jn.getBoundIpcAddress()));
    }

    private void getMissingLogSegments(List<RemoteEditLog> list, QJournalProtocolProtos.GetEditLogManifestResponseProto getEditLogManifestResponseProto, JournalNodeProxy journalNodeProxy) {
        List<RemoteEditLog> logs = PBHelper.convert(getEditLogManifestResponseProto.getManifest()).getLogs();
        if (logs == null || logs.isEmpty()) {
            LOG.warn("Journal at " + journalNodeProxy.jnAddr + " has no edit logs");
            return;
        }
        List<RemoteEditLog> missingLogList = getMissingLogList(list, logs);
        if (missingLogList.isEmpty()) {
            return;
        }
        NamespaceInfo namespaceInfo = this.jnStorage.getNamespaceInfo();
        for (RemoteEditLog remoteEditLog : missingLogList) {
            boolean z = false;
            try {
                if (journalNodeProxy.httpServerUrl == null) {
                    if (!getEditLogManifestResponseProto.hasFromURL()) {
                        LOG.error("EditLogManifest response does not have fromUrl field set. Aborting current sync attempt");
                        return;
                    }
                    journalNodeProxy.httpServerUrl = getHttpServerURI(getEditLogManifestResponseProto.getFromURL(), journalNodeProxy.jnAddr.getHostName());
                }
                z = downloadMissingLogSegment(new URL(journalNodeProxy.httpServerUrl, GetJournalEditServlet.buildPath(this.jid, remoteEditLog.getStartTxId(), namespaceInfo, false)), remoteEditLog);
            } catch (MalformedURLException e) {
                LOG.error("MalformedURL when download missing log segment", (Throwable) e);
            } catch (URISyntaxException e2) {
                LOG.error("EditLogManifest's fromUrl field syntax incorrect", (Throwable) e2);
            } catch (Exception e3) {
                LOG.error("Exception in downloading missing log segment from url " + ((Object) null), (Throwable) e3);
            }
            if (!z) {
                LOG.error("Aborting current sync attempt.");
                return;
            }
        }
    }

    private List<RemoteEditLog> getMissingLogList(List<RemoteEditLog> list, List<RemoteEditLog> list2) {
        if (list.isEmpty()) {
            return list2;
        }
        ArrayList newArrayList = Lists.newArrayList();
        int i = 0;
        int i2 = 0;
        int size = list.size();
        int size2 = list2.size();
        while (i < size && i2 < size2) {
            long startTxId = list.get(i).getStartTxId();
            long startTxId2 = list2.get(i2).getStartTxId();
            if (startTxId == startTxId2) {
                i++;
                i2++;
            } else if (startTxId > startTxId2) {
                newArrayList.add(list2.get(i2));
                i2++;
            } else {
                i++;
            }
        }
        if (i2 < size2) {
            while (i2 < size2) {
                newArrayList.add(list2.get(i2));
                i2++;
            }
        }
        return newArrayList;
    }

    private URL getHttpServerURI(String str, String str2) throws URISyntaxException, MalformedURLException {
        URI uri = new URI(str);
        return new URL(uri.getScheme(), str2, uri.getPort(), "");
    }

    private boolean downloadMissingLogSegment(URL url, RemoteEditLog remoteEditLog) throws IOException {
        LOG.info("Downloading missing Edit Log from " + url + " to " + this.jnStorage.getRoot());
        if (!$assertionsDisabled && (remoteEditLog.getStartTxId() <= 0 || remoteEditLog.getEndTxId() <= 0)) {
            throw new AssertionError("bad log: " + remoteEditLog);
        }
        File finalizedEditsFile = this.jnStorage.getFinalizedEditsFile(remoteEditLog.getStartTxId(), remoteEditLog.getEndTxId());
        if (finalizedEditsFile.exists() && FileUtil.canRead(finalizedEditsFile)) {
            LOG.info("Skipping download of remote edit log " + remoteEditLog + " since it's already stored locally at " + finalizedEditsFile);
            return true;
        }
        File temporaryEditsFile = this.jnStorage.getTemporaryEditsFile(remoteEditLog.getStartTxId(), remoteEditLog.getEndTxId());
        if (!((Boolean) SecurityUtil.doAsLoginUser(() -> {
            if (UserGroupInformation.isSecurityEnabled()) {
                UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
            }
            try {
                Util.doGetUrl(url, ImmutableList.of(temporaryEditsFile), this.jnStorage, false, this.logSegmentTransferTimeout, this.throttler);
                return true;
            } catch (IOException e) {
                LOG.error("Download of Edit Log file for Syncing failed. Deleting temp file: " + temporaryEditsFile, (Throwable) e);
                if (!temporaryEditsFile.delete()) {
                    LOG.warn("Deleting " + temporaryEditsFile + " has failed");
                }
                return false;
            }
        })).booleanValue()) {
            return false;
        }
        LOG.info("Downloaded file " + temporaryEditsFile.getName() + " of size " + temporaryEditsFile.length() + " bytes.");
        boolean z = false;
        try {
            try {
                z = this.journal.moveTmpSegmentToCurrent(temporaryEditsFile, finalizedEditsFile, remoteEditLog.getEndTxId());
                if (temporaryEditsFile.exists() && !temporaryEditsFile.delete()) {
                    LOG.warn("Deleting " + temporaryEditsFile + " has failed");
                }
            } catch (IOException e) {
                LOG.info("Could not move {} to current directory.", temporaryEditsFile);
                if (temporaryEditsFile.exists() && !temporaryEditsFile.delete()) {
                    LOG.warn("Deleting " + temporaryEditsFile + " has failed");
                }
            }
            if (!z) {
                return false;
            }
            this.metrics.incrNumEditLogsSynced();
            return true;
        } catch (Throwable th) {
            if (temporaryEditsFile.exists() && !temporaryEditsFile.delete()) {
                LOG.warn("Deleting " + temporaryEditsFile + " has failed");
            }
            throw th;
        }
    }

    private static DataTransferThrottler getThrottler(Configuration configuration) {
        long j = configuration.getLong(DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_KEY, 0L);
        DataTransferThrottler dataTransferThrottler = null;
        if (j > 0) {
            dataTransferThrottler = new DataTransferThrottler(j);
        }
        return dataTransferThrottler;
    }

    static {
        $assertionsDisabled = !JournalNodeSyncer.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) JournalNodeSyncer.class);
    }
}
