package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.math3.geometry.VectorFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.codehaus.jackson.util.MinimalPrettyPrinter;

/* JADX INFO: Access modifiers changed from: package-private */
/* JADX WARN: Classes with same name are omitted:
  input_file:webhdfs.war:WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor.class
  input_file:webhdfs/WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor.class
 */
@InterfaceAudience.Private
/* loaded from: input_file:hadoop-hdfs-httpfs-2.5.1-mapr-1410-SNAPSHOT/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor.class */
public class BPServiceActor implements Runnable {
    static final Log LOG;
    final InetSocketAddress nnAddr;
    HAServiceProtocol.HAServiceState state;
    final BPOfferService bpos;
    Thread bpThread;
    DatanodeProtocolClientSideTranslatorPB bpNamenode;
    private final DataNode dn;
    private final DNConf dnConf;
    private DatanodeRegistration bpRegistration;
    static final /* synthetic */ boolean $assertionsDisabled;
    volatile long lastBlockReport = 0;
    volatile long lastDeletedReport = 0;
    boolean resetBlockReportTime = true;
    volatile long lastCacheReport = 0;
    private volatile long lastHeartbeat = 0;
    private volatile RunningState runningState = RunningState.CONNECTING;
    private final Map<DatanodeStorage, PerStoragePendingIncrementalBR> pendingIncrementalBRperStorage = Maps.newHashMap();
    private volatile boolean sendImmediateIBR = false;
    private volatile boolean shouldServiceRun = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:webhdfs.war:WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$PerStoragePendingIncrementalBR.class
      input_file:webhdfs/WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$PerStoragePendingIncrementalBR.class
     */
    /* loaded from: input_file:hadoop-hdfs-httpfs-2.5.1-mapr-1410-SNAPSHOT/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$PerStoragePendingIncrementalBR.class */
    public static class PerStoragePendingIncrementalBR {
        private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR;

        private PerStoragePendingIncrementalBR() {
            this.pendingIncrementalBR = Maps.newHashMap();
        }

        int getBlockInfoCount() {
            return this.pendingIncrementalBR.size();
        }

        ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
            ReceivedDeletedBlockInfo[] receivedDeletedBlockInfoArr = (ReceivedDeletedBlockInfo[]) this.pendingIncrementalBR.values().toArray(new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
            this.pendingIncrementalBR.clear();
            return receivedDeletedBlockInfoArr;
        }

        int putMissingBlockInfos(ReceivedDeletedBlockInfo[] receivedDeletedBlockInfoArr) {
            int i = 0;
            for (ReceivedDeletedBlockInfo receivedDeletedBlockInfo : receivedDeletedBlockInfoArr) {
                if (!this.pendingIncrementalBR.containsKey(Long.valueOf(receivedDeletedBlockInfo.getBlock().getBlockId()))) {
                    this.pendingIncrementalBR.put(Long.valueOf(receivedDeletedBlockInfo.getBlock().getBlockId()), receivedDeletedBlockInfo);
                    i++;
                }
            }
            return i;
        }

        void putBlockInfo(ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
            this.pendingIncrementalBR.put(Long.valueOf(receivedDeletedBlockInfo.getBlock().getBlockId()), receivedDeletedBlockInfo);
        }

        boolean removeBlockInfo(ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
            return this.pendingIncrementalBR.remove(Long.valueOf(receivedDeletedBlockInfo.getBlock().getBlockId())) != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:webhdfs.war:WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$RunningState.class
      input_file:webhdfs/WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$RunningState.class
     */
    /* loaded from: input_file:hadoop-hdfs-httpfs-2.5.1-mapr-1410-SNAPSHOT/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/hadoop-hdfs-2.5.1-mapr-1410-SNAPSHOT.jar:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$RunningState.class */
    public enum RunningState {
        CONNECTING,
        INIT_FAILED,
        RUNNING,
        EXITED,
        FAILED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BPServiceActor(InetSocketAddress inetSocketAddress, BPOfferService bPOfferService) {
        this.bpos = bPOfferService;
        this.dn = bPOfferService.getDataNode();
        this.nnAddr = inetSocketAddress;
        this.dnConf = this.dn.getDnConf();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAlive() {
        if (this.shouldServiceRun && this.bpThread.isAlive()) {
            return this.runningState == RunningState.RUNNING || this.runningState == RunningState.CONNECTING;
        }
        return false;
    }

    public String toString() {
        return this.bpos.toString() + " service to " + this.nnAddr;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InetSocketAddress getNNSocketAddress() {
        return this.nnAddr;
    }

    @VisibleForTesting
    void setNameNode(DatanodeProtocolClientSideTranslatorPB datanodeProtocolClientSideTranslatorPB) {
        this.bpNamenode = datanodeProtocolClientSideTranslatorPB;
    }

    @VisibleForTesting
    DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() {
        return this.bpNamenode;
    }

    @VisibleForTesting
    NamespaceInfo retrieveNamespaceInfo() throws IOException {
        NamespaceInfo namespaceInfo = null;
        while (shouldRun()) {
            try {
                namespaceInfo = this.bpNamenode.versionRequest();
                LOG.debug(this + " received versionRequest response: " + namespaceInfo);
                break;
            } catch (SocketTimeoutException e) {
                LOG.warn("Problem connecting to server: " + this.nnAddr);
                sleepAndLogInterrupts(5000, "requesting version info from NN");
            } catch (IOException e2) {
                LOG.warn("Problem connecting to server: " + this.nnAddr);
                sleepAndLogInterrupts(5000, "requesting version info from NN");
            }
        }
        if (namespaceInfo == null) {
            throw new IOException("DN shut down before block pool connected");
        }
        checkNNVersion(namespaceInfo);
        return namespaceInfo;
    }

    private void checkNNVersion(NamespaceInfo namespaceInfo) throws IncorrectVersionException {
        String softwareVersion = namespaceInfo.getSoftwareVersion();
        String minimumNameNodeVersion = this.dnConf.getMinimumNameNodeVersion();
        if (VersionUtil.compareVersions(softwareVersion, minimumNameNodeVersion) < 0) {
            IncorrectVersionException incorrectVersionException = new IncorrectVersionException(minimumNameNodeVersion, softwareVersion, "NameNode", "DataNode");
            LOG.warn(incorrectVersionException.getMessage());
            throw incorrectVersionException;
        }
        String version = VersionInfo.getVersion();
        if (softwareVersion.equals(version)) {
            return;
        }
        LOG.info("Reported NameNode version '" + softwareVersion + "' does not match DataNode version '" + version + "' but is within acceptable limits. Note: This is normal during a rolling upgrade.");
    }

    private void connectToNNAndHandshake() throws IOException {
        this.bpNamenode = this.dn.connectToNN(this.nnAddr);
        this.bpos.verifyAndSetNamespaceInfo(retrieveNamespaceInfo());
        register();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleBlockReport(long j) {
        if (j > 0) {
            this.lastBlockReport = Time.now() - (this.dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int) j));
        } else {
            this.lastBlockReport = this.lastHeartbeat - this.dnConf.blockReportInterval;
        }
        this.resetBlockReportTime = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportBadBlocks(ExtendedBlock extendedBlock, String str, StorageType storageType) {
        if (this.bpRegistration == null) {
            return;
        }
        try {
            this.bpNamenode.reportBadBlocks(new LocatedBlock[]{new LocatedBlock(extendedBlock, new DatanodeInfo[]{new DatanodeInfo(this.bpRegistration)}, new String[]{str}, new StorageType[]{storageType})});
        } catch (IOException e) {
            LOG.warn("Failed to report bad block " + extendedBlock + " to namenode :  Exception", e);
        }
    }

    private void reportReceivedDeletedBlocks() throws IOException {
        ArrayList arrayList = new ArrayList(this.pendingIncrementalBRperStorage.size());
        synchronized (this.pendingIncrementalBRperStorage) {
            for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry : this.pendingIncrementalBRperStorage.entrySet()) {
                DatanodeStorage key = entry.getKey();
                PerStoragePendingIncrementalBR value = entry.getValue();
                if (value.getBlockInfoCount() > 0) {
                    arrayList.add(new StorageReceivedDeletedBlocks(key, value.dequeueBlockInfos()));
                }
            }
            this.sendImmediateIBR = false;
        }
        if (arrayList.size() == 0) {
            return;
        }
        boolean z = false;
        try {
            this.bpNamenode.blockReceivedAndDeleted(this.bpRegistration, this.bpos.getBlockPoolId(), (StorageReceivedDeletedBlocks[]) arrayList.toArray(new StorageReceivedDeletedBlocks[arrayList.size()]));
            z = true;
            if (1 == 0) {
                synchronized (this.pendingIncrementalBRperStorage) {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        StorageReceivedDeletedBlocks storageReceivedDeletedBlocks = (StorageReceivedDeletedBlocks) it.next();
                        this.pendingIncrementalBRperStorage.get(storageReceivedDeletedBlocks.getStorage()).putMissingBlockInfos(storageReceivedDeletedBlocks.getBlocks());
                        this.sendImmediateIBR = true;
                    }
                }
            }
        } catch (Throwable th) {
            if (!z) {
                synchronized (this.pendingIncrementalBRperStorage) {
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        StorageReceivedDeletedBlocks storageReceivedDeletedBlocks2 = (StorageReceivedDeletedBlocks) it2.next();
                        this.pendingIncrementalBRperStorage.get(storageReceivedDeletedBlocks2.getStorage()).putMissingBlockInfos(storageReceivedDeletedBlocks2.getBlocks());
                        this.sendImmediateIBR = true;
                    }
                }
            }
            throw th;
        }
    }

    private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(DatanodeStorage datanodeStorage) {
        PerStoragePendingIncrementalBR perStoragePendingIncrementalBR = this.pendingIncrementalBRperStorage.get(datanodeStorage);
        if (perStoragePendingIncrementalBR == null) {
            perStoragePendingIncrementalBR = new PerStoragePendingIncrementalBR();
            this.pendingIncrementalBRperStorage.put(datanodeStorage, perStoragePendingIncrementalBR);
        }
        return perStoragePendingIncrementalBR;
    }

    void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo receivedDeletedBlockInfo, DatanodeStorage datanodeStorage) {
        Iterator<Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR>> it = this.pendingIncrementalBRperStorage.entrySet().iterator();
        while (it.hasNext() && !it.next().getValue().removeBlockInfo(receivedDeletedBlockInfo)) {
        }
        getIncrementalBRMapForStorage(datanodeStorage).putBlockInfo(receivedDeletedBlockInfo);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo receivedDeletedBlockInfo, String str) {
        synchronized (this.pendingIncrementalBRperStorage) {
            addPendingReplicationBlockInfo(receivedDeletedBlockInfo, this.dn.getFSDataset().getStorage(str));
            this.sendImmediateIBR = true;
            this.pendingIncrementalBRperStorage.notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo receivedDeletedBlockInfo, String str) {
        synchronized (this.pendingIncrementalBRperStorage) {
            addPendingReplicationBlockInfo(receivedDeletedBlockInfo, this.dn.getFSDataset().getStorage(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void triggerBlockReportForTests() {
        synchronized (this.pendingIncrementalBRperStorage) {
            this.lastBlockReport = 0L;
            this.lastHeartbeat = 0L;
            this.pendingIncrementalBRperStorage.notifyAll();
            while (this.lastBlockReport == 0) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void triggerHeartbeatForTests() {
        synchronized (this.pendingIncrementalBRperStorage) {
            this.lastHeartbeat = 0L;
            this.pendingIncrementalBRperStorage.notifyAll();
            while (this.lastHeartbeat == 0) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void triggerDeletionReportForTests() {
        synchronized (this.pendingIncrementalBRperStorage) {
            this.lastDeletedReport = 0L;
            this.pendingIncrementalBRperStorage.notifyAll();
            while (this.lastDeletedReport == 0) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    boolean hasPendingIBR() {
        return this.sendImmediateIBR;
    }

    List<DatanodeCommand> blockReport() throws IOException {
        int i;
        long now = Time.now();
        if (now - this.lastBlockReport <= this.dnConf.blockReportInterval) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        reportReceivedDeletedBlocks();
        this.lastDeletedReport = now;
        long now2 = Time.now();
        Map<DatanodeStorage, BlockListAsLongs> blockReports = this.dn.getFSDataset().getBlockReports(this.bpos.getBlockPoolId());
        int i2 = 0;
        int i3 = 0;
        StorageBlockReport[] storageBlockReportArr = new StorageBlockReport[blockReports.size()];
        for (Map.Entry<DatanodeStorage, BlockListAsLongs> entry : blockReports.entrySet()) {
            BlockListAsLongs value = entry.getValue();
            int i4 = i2;
            i2++;
            storageBlockReportArr[i4] = new StorageBlockReport(entry.getKey(), value.getBlockListAsLongs());
            i3 += value.getNumberOfBlocks();
        }
        long now3 = Time.now();
        if (i3 < this.dnConf.blockReportSplitThreshold) {
            i = 1;
            DatanodeCommand blockReport = this.bpNamenode.blockReport(this.bpRegistration, this.bpos.getBlockPoolId(), storageBlockReportArr);
            if (blockReport != null) {
                arrayList.add(blockReport);
            }
        } else {
            i = i2;
            for (StorageBlockReport storageBlockReport : storageBlockReportArr) {
                DatanodeCommand blockReport2 = this.bpNamenode.blockReport(this.bpRegistration, this.bpos.getBlockPoolId(), new StorageBlockReport[]{storageBlockReport});
                if (blockReport2 != null) {
                    arrayList.add(blockReport2);
                }
            }
        }
        long now4 = Time.now() - now3;
        long j = now3 - now2;
        this.dn.getMetrics().addBlockReport(now4);
        LOG.info("Sent " + i + " blockreports " + i3 + " blocks total. Took " + j + " msec to generate and " + now4 + " msecs for RPC and NN processing.  Got back commands " + (arrayList.size() == 0 ? "none" : Joiner.on(VectorFormat.DEFAULT_SEPARATOR).join((Iterable<?>) arrayList)));
        scheduleNextBlockReport(now);
        if (arrayList.size() == 0) {
            return null;
        }
        return arrayList;
    }

    private void scheduleNextBlockReport(long j) {
        if (!this.resetBlockReportTime) {
            this.lastBlockReport += ((Time.now() - this.lastBlockReport) / this.dnConf.blockReportInterval) * this.dnConf.blockReportInterval;
        } else {
            this.lastBlockReport = j - DFSUtil.getRandom().nextInt((int) this.dnConf.blockReportInterval);
            this.resetBlockReportTime = false;
        }
    }

    DatanodeCommand cacheReport() throws IOException {
        if (this.dn.getFSDataset().getCacheCapacity() == 0) {
            return null;
        }
        DatanodeCommand datanodeCommand = null;
        long monotonicNow = Time.monotonicNow();
        if (monotonicNow - this.lastCacheReport > this.dnConf.cacheReportInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending cacheReport from service actor: " + this);
            }
            this.lastCacheReport = monotonicNow;
            String blockPoolId = this.bpos.getBlockPoolId();
            List<Long> cacheReport = this.dn.getFSDataset().getCacheReport(blockPoolId);
            long monotonicNow2 = Time.monotonicNow();
            datanodeCommand = this.bpNamenode.cacheReport(this.bpRegistration, blockPoolId, cacheReport);
            long j = monotonicNow2 - monotonicNow;
            long monotonicNow3 = Time.monotonicNow() - monotonicNow2;
            this.dn.getMetrics().addCacheReport(monotonicNow3);
            LOG.debug("CacheReport of " + cacheReport.size() + " block(s) took " + j + " msec to generate and " + monotonicNow3 + " msecs for RPC and NN processing");
        }
        return datanodeCommand;
    }

    HeartbeatResponse sendHeartBeat() throws IOException {
        StorageReport[] storageReports = this.dn.getFSDataset().getStorageReports(this.bpos.getBlockPoolId());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending heartbeat with " + storageReports.length + " storage reports from service actor: " + this);
        }
        return this.bpNamenode.sendHeartbeat(this.bpRegistration, storageReports, this.dn.getFSDataset().getCacheCapacity(), this.dn.getFSDataset().getCacheUsed(), this.dn.getXmitsInProgress(), this.dn.getXceiverCount(), this.dn.getFSDataset().getNumFailedVolumes());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (this.bpThread == null || !this.bpThread.isAlive()) {
            this.bpThread = new Thread(this, formatThreadName());
            this.bpThread.setDaemon(true);
            this.bpThread.start();
        }
    }

    private String formatThreadName() {
        return "DataNode: [" + DataNode.getStorageLocations(this.dn.getConf()).toString() + "]  heartbeating to " + this.nnAddr;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.shouldServiceRun = false;
        if (this.bpThread != null) {
            this.bpThread.interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void join() {
        try {
            if (this.bpThread != null) {
                this.bpThread.join();
            }
        } catch (InterruptedException e) {
        }
    }

    private synchronized void cleanUp() {
        this.shouldServiceRun = false;
        IOUtils.cleanup(LOG, this.bpNamenode);
        this.bpos.shutdownActor(this);
    }

    private void handleRollingUpgradeStatus(HeartbeatResponse heartbeatResponse) {
        RollingUpgradeStatus rollingUpdateStatus = heartbeatResponse.getRollingUpdateStatus();
        if (rollingUpdateStatus == null || rollingUpdateStatus.getBlockPoolId().compareTo(this.bpos.getBlockPoolId()) == 0) {
            this.bpos.signalRollingUpgrade(rollingUpdateStatus != null);
        } else {
            LOG.error("Invalid BlockPoolId " + rollingUpdateStatus.getBlockPoolId() + " in HeartbeatResponse. Expected " + this.bpos.getBlockPoolId());
        }
    }

    private void offerService() throws Exception {
        long now;
        LOG.info("For namenode " + this.nnAddr + " using DELETEREPORT_INTERVAL of " + this.dnConf.deleteReportInterval + " msec  BLOCKREPORT_INTERVAL of " + this.dnConf.blockReportInterval + "msec CACHEREPORT_INTERVAL of " + this.dnConf.cacheReportInterval + "msec Initial delay: " + this.dnConf.initialBlockReportDelay + "msec; heartBeatInterval=" + this.dnConf.heartBeatInterval);
        while (shouldRun()) {
            try {
                now = Time.now();
            } catch (RemoteException e) {
                String className = e.getClassName();
                if (UnregisteredNodeException.class.getName().equals(className) || DisallowedDatanodeException.class.getName().equals(className) || IncorrectVersionException.class.getName().equals(className)) {
                    LOG.warn(this + " is shutting down", e);
                    this.shouldServiceRun = false;
                    return;
                } else {
                    LOG.warn("RemoteException in offerService", e);
                    try {
                        Thread.sleep(Math.min(1000L, this.dnConf.heartBeatInterval));
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e3) {
                LOG.warn("IOException in offerService", e3);
            }
            if (now - this.lastHeartbeat >= this.dnConf.heartBeatInterval) {
                this.lastHeartbeat = now;
                if (!this.dn.areHeartbeatsDisabledForTests()) {
                    HeartbeatResponse sendHeartBeat = sendHeartBeat();
                    if (!$assertionsDisabled && sendHeartBeat == null) {
                        throw new AssertionError();
                    }
                    this.dn.getMetrics().addHeartbeat(Time.now() - now);
                    this.bpos.updateActorStatesFromHeartbeat(this, sendHeartBeat.getNameNodeHaState());
                    this.state = sendHeartBeat.getNameNodeHaState().getState();
                    if (this.state == HAServiceProtocol.HAServiceState.ACTIVE) {
                        handleRollingUpgradeStatus(sendHeartBeat);
                    }
                    long now2 = Time.now();
                    if (processCommand(sendHeartBeat.getCommands())) {
                        long now3 = Time.now();
                        if (now3 - now2 > HdfsServerConstants.NAMENODE_LEASE_RECHECK_INTERVAL) {
                            LOG.info("Took " + (now3 - now2) + "ms to process " + sendHeartBeat.getCommands().length + " commands from NN");
                        }
                    }
                }
            }
            if (this.sendImmediateIBR || now - this.lastDeletedReport > this.dnConf.deleteReportInterval) {
                reportReceivedDeletedBlocks();
                this.lastDeletedReport = now;
            }
            List<DatanodeCommand> blockReport = blockReport();
            processCommand(blockReport == null ? null : (DatanodeCommand[]) blockReport.toArray(new DatanodeCommand[blockReport.size()]));
            processCommand(new DatanodeCommand[]{cacheReport()});
            if (this.dn.blockScanner != null) {
                this.dn.blockScanner.addBlockPool(this.bpos.getBlockPoolId());
            }
            long now4 = this.dnConf.heartBeatInterval - (Time.now() - this.lastHeartbeat);
            synchronized (this.pendingIncrementalBRperStorage) {
                if (now4 > 0) {
                    if (!this.sendImmediateIBR) {
                        try {
                            this.pendingIncrementalBRperStorage.wait(now4);
                        } catch (InterruptedException e4) {
                            LOG.warn("BPOfferService for " + this + " interrupted");
                        }
                    }
                }
            }
        }
    }

    void register() throws IOException {
        this.bpRegistration = this.bpos.createRegistration();
        LOG.info(this + " beginning handshake with NN");
        while (shouldRun()) {
            try {
                this.bpRegistration = this.bpNamenode.registerDatanode(this.bpRegistration);
                break;
            } catch (SocketTimeoutException e) {
                LOG.info("Problem connecting to server: " + this.nnAddr);
                sleepAndLogInterrupts(1000, "connecting to server");
            }
        }
        LOG.info("Block pool " + this + " successfully registered with NN");
        this.bpos.registrationSucceeded(this, this.bpRegistration);
        scheduleBlockReport(this.dnConf.initialBlockReportDelay);
    }

    private void sleepAndLogInterrupts(int i, String str) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            LOG.info("BPOfferService " + this + " interrupted while " + str);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info(this + " starting to offer service");
        while (true) {
            try {
                try {
                    try {
                        connectToNNAndHandshake();
                        this.runningState = RunningState.RUNNING;
                        while (shouldRun()) {
                            try {
                                offerService();
                            } catch (Exception e) {
                                LOG.error("Exception in BPOfferService for " + this, e);
                                sleepAndLogInterrupts(5000, "offering service");
                            }
                        }
                        this.runningState = RunningState.EXITED;
                        LOG.warn("Ending block pool service for: " + this);
                        cleanUp();
                        return;
                    } catch (Throwable th) {
                        LOG.warn("Unexpected exception in block pool " + this, th);
                        this.runningState = RunningState.FAILED;
                        LOG.warn("Ending block pool service for: " + this);
                        cleanUp();
                        return;
                    }
                } catch (Throwable th2) {
                    LOG.warn("Ending block pool service for: " + this);
                    cleanUp();
                    throw th2;
                }
            } catch (IOException e2) {
                this.runningState = RunningState.INIT_FAILED;
                if (!shouldRetryInit()) {
                    this.runningState = RunningState.FAILED;
                    LOG.fatal("Initialization failed for " + this + ". Exiting. ", e2);
                    LOG.warn("Ending block pool service for: " + this);
                    cleanUp();
                    return;
                }
                LOG.error("Initialization failed for " + this + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + e2.getLocalizedMessage());
                sleepAndLogInterrupts(5000, "initializing");
            }
        }
    }

    private boolean shouldRetryInit() {
        return shouldRun() && this.bpos.shouldRetryInit();
    }

    private boolean shouldRun() {
        return this.shouldServiceRun && this.dn.shouldRun();
    }

    boolean processCommand(DatanodeCommand[] datanodeCommandArr) {
        if (datanodeCommandArr == null) {
            return true;
        }
        for (DatanodeCommand datanodeCommand : datanodeCommandArr) {
            try {
            } catch (IOException e) {
                LOG.warn("Error processing datanode Command", e);
            }
            if (!this.bpos.processCommandFromActor(datanodeCommand, this)) {
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void trySendErrorReport(int i, String str) {
        try {
            this.bpNamenode.errorReport(this.bpRegistration, i, str);
        } catch (IOException e) {
            LOG.warn("Error reporting an error to NameNode " + this.nnAddr, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportRemoteBadBlock(DatanodeInfo datanodeInfo, ExtendedBlock extendedBlock) throws IOException {
        this.bpNamenode.reportBadBlocks(new LocatedBlock[]{new LocatedBlock(extendedBlock, new DatanodeInfo[]{datanodeInfo})});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reRegister() throws IOException {
        if (shouldRun()) {
            retrieveNamespaceInfo();
            register();
        }
    }

    static {
        $assertionsDisabled = !BPServiceActor.class.desiredAssertionStatus();
        LOG = DataNode.LOG;
    }
}
