/*
 * Decompiled with CFR 0.152.
 */
package tachyon.worker;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.Constants;
import tachyon.UnderFileSystem;
import tachyon.UnderFileSystemHdfs;
import tachyon.Version;
import tachyon.conf.CommonConf;
import tachyon.conf.WorkerConf;
import tachyon.org.apache.thrift.server.TServer;
import tachyon.org.apache.thrift.server.TThreadedSelectorServer;
import tachyon.org.apache.thrift.transport.TNonblockingServerSocket;
import tachyon.org.apache.thrift.transport.TTransportException;
import tachyon.thrift.Command;
import tachyon.thrift.NetAddress;
import tachyon.thrift.WorkerService;
import tachyon.util.CommonUtils;
import tachyon.util.NetworkUtils;
import tachyon.util.ThreadFactoryUtils;
import tachyon.worker.BlocksLocker;
import tachyon.worker.DataServer;
import tachyon.worker.WorkerServiceHandler;
import tachyon.worker.WorkerStorage;
import tachyon.worker.netty.NettyDataServer;
import tachyon.worker.nio.NIODataServer;

public class TachyonWorker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((String)Constants.LOGGER_TYPE);
    private final InetSocketAddress mMasterAddress;
    private final NetAddress mWorkerAddress;
    private TServer mServer;
    private TNonblockingServerSocket mServerTNonblockingServerSocket;
    private final WorkerStorage mWorkerStorage;
    private final WorkerServiceHandler mWorkerServiceHandler;
    private final DataServer mDataServer;
    private final Thread mHeartbeatThread;
    private volatile boolean mStop = false;
    private final int mPort;
    private final int mDataPort;
    private final ExecutorService mExecutorService = Executors.newFixedThreadPool(1, ThreadFactoryUtils.daemon("heartbeat-worker-%d"));

    public static synchronized TachyonWorker createWorker(InetSocketAddress masterAddress, InetSocketAddress workerAddress, int dataPort, int selectorThreads, int acceptQueueSizePerThreads, int workerThreads) {
        return new TachyonWorker(masterAddress, workerAddress, dataPort, selectorThreads, acceptQueueSizePerThreads, workerThreads);
    }

    public static synchronized TachyonWorker createWorker(String masterAddress, String workerAddress, int dataPort, int selectorThreads, int acceptQueueSizePerThreads, int workerThreads) {
        String[] address = masterAddress.split(":");
        InetSocketAddress master = new InetSocketAddress(address[0], Integer.parseInt(address[1]));
        address = workerAddress.split(":");
        InetSocketAddress worker = new InetSocketAddress(address[0], Integer.parseInt(address[1]));
        return new TachyonWorker(master, worker, dataPort, selectorThreads, acceptQueueSizePerThreads, workerThreads);
    }

    private static String getMasterLocation(String[] args) {
        String masterLocation;
        WorkerConf wConf = WorkerConf.get();
        String confFileMasterLoc = wConf.MASTER_HOSTNAME + ":" + wConf.MASTER_PORT;
        if (args.length < 1) {
            masterLocation = confFileMasterLoc;
        } else {
            masterLocation = args[0];
            if (masterLocation.indexOf(":") == -1) {
                masterLocation = masterLocation + ":" + wConf.MASTER_PORT;
            }
            if (!masterLocation.equals(confFileMasterLoc)) {
                LOG.warn("Master Address in configuration file(" + confFileMasterLoc + ") is different " + "from the command line one(" + masterLocation + ").");
            }
        }
        return masterLocation;
    }

    public static void main(String[] args) throws UnknownHostException {
        if (args.length > 1) {
            LOG.info("Usage: java -cp target/tachyon-" + Version.VERSION + "-jar-with-dependencies.jar " + "tachyon.Worker [<MasterHost:Port>]");
            System.exit(-1);
        }
        WorkerConf wConf = WorkerConf.get();
        String resolvedWorkerHost = NetworkUtils.getLocalHostName();
        LOG.info("Resolved local TachyonWorker host to " + resolvedWorkerHost);
        try {
            TachyonWorker worker = TachyonWorker.createWorker(TachyonWorker.getMasterLocation(args), resolvedWorkerHost + ":" + wConf.PORT, wConf.DATA_PORT, wConf.SELECTOR_THREADS, wConf.QUEUE_SIZE_PER_SELECTOR, wConf.SERVER_THREADS);
            worker.start();
        }
        catch (Exception e) {
            LOG.error("Uncaught exception terminating worker", (Throwable)e);
            System.exit(-1);
        }
    }

    private TachyonWorker(InetSocketAddress masterAddress, InetSocketAddress workerAddress, int dataPort, int selectorThreads, int acceptQueueSizePerThreads, int workerThreads) {
        CommonConf.assertValidPort(masterAddress);
        CommonConf.assertValidPort(workerAddress);
        CommonConf.assertValidPort(dataPort);
        this.mMasterAddress = masterAddress;
        this.mWorkerStorage = new WorkerStorage(this.mMasterAddress, this.mExecutorService);
        this.mWorkerServiceHandler = new WorkerServiceHandler(this.mWorkerStorage);
        InetSocketAddress dataAddress = new InetSocketAddress(workerAddress.getHostName(), dataPort);
        BlocksLocker blockLocker = new BlocksLocker(this.mWorkerStorage, -1);
        this.mDataServer = this.createDataServer(dataAddress, blockLocker);
        this.mDataPort = this.mDataServer.getPort();
        this.mHeartbeatThread = new Thread(this);
        try {
            LOG.info("Tachyon Worker version " + Version.VERSION + " tries to start @ " + workerAddress);
            WorkerService.Processor<WorkerServiceHandler> processor = new WorkerService.Processor<WorkerServiceHandler>(this.mWorkerServiceHandler);
            this.mServerTNonblockingServerSocket = new TNonblockingServerSocket(workerAddress);
            this.mPort = NetworkUtils.getPort(this.mServerTNonblockingServerSocket);
            this.mServer = new TThreadedSelectorServer(((TThreadedSelectorServer.Args)new TThreadedSelectorServer.Args(this.mServerTNonblockingServerSocket).processor(processor)).selectorThreads(selectorThreads).acceptQueueSizePerThread(acceptQueueSizePerThreads).workerThreads(workerThreads));
        }
        catch (TTransportException e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw Throwables.propagate((Throwable)e);
        }
        this.mWorkerAddress = new NetAddress(workerAddress.getAddress().getCanonicalHostName(), this.mPort, this.mDataPort);
        this.mWorkerStorage.initialize(this.mWorkerAddress);
    }

    private DataServer createDataServer(InetSocketAddress dataAddress, BlocksLocker blockLocker) {
        switch (WorkerConf.get().NETWORK_TYPE) {
            case NIO: {
                return new NIODataServer(dataAddress, blockLocker);
            }
            case NETTY: {
                return new NettyDataServer(dataAddress, blockLocker);
            }
        }
        throw new AssertionError((Object)("Unknown network type: " + (Object)((Object)WorkerConf.get().NETWORK_TYPE)));
    }

    public int getDataPort() {
        return this.mDataPort;
    }

    public int getMetaPort() {
        return this.mPort;
    }

    WorkerServiceHandler getWorkerServiceHandler() {
        return this.mWorkerServiceHandler;
    }

    private void login() throws IOException {
        WorkerConf wConf = WorkerConf.get();
        if (wConf.KEYTAB == null || wConf.PRINCIPAL == null) {
            return;
        }
        UnderFileSystem ufs = UnderFileSystem.get(CommonConf.get().UNDERFS_ADDRESS);
        if (ufs instanceof UnderFileSystemHdfs) {
            ((UnderFileSystemHdfs)ufs).login(wConf.KEYTAB_KEY, wConf.KEYTAB, wConf.PRINCIPAL_KEY, wConf.PRINCIPAL, NetworkUtils.getFqdnHost(this.mWorkerAddress));
        }
    }

    @Override
    public void run() {
        long lastHeartbeatMs = System.currentTimeMillis();
        Command cmd = null;
        while (!this.mStop) {
            block13: {
                long diff = System.currentTimeMillis() - lastHeartbeatMs;
                if (diff < (long)WorkerConf.get().TO_MASTER_HEARTBEAT_INTERVAL_MS) {
                    LOG.debug("Heartbeat process takes {} ms.", (Object)diff);
                    CommonUtils.sleepMs(LOG, (long)WorkerConf.get().TO_MASTER_HEARTBEAT_INTERVAL_MS - diff);
                } else {
                    LOG.error("Heartbeat process takes " + diff + " ms.");
                }
                try {
                    cmd = this.mWorkerStorage.heartbeat();
                    lastHeartbeatMs = System.currentTimeMillis();
                }
                catch (IOException e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                    this.mWorkerStorage.resetMasterClient();
                    CommonUtils.sleepMs(LOG, 1000L);
                    cmd = null;
                    if (System.currentTimeMillis() - lastHeartbeatMs < WorkerConf.get().HEARTBEAT_TIMEOUT_MS) break block13;
                    throw new RuntimeException("Heartbeat timeout " + (System.currentTimeMillis() - lastHeartbeatMs) + "ms");
                }
            }
            if (cmd != null) {
                switch (cmd.mCommandType) {
                    case Unknown: {
                        LOG.error("Unknown command: " + cmd);
                        break;
                    }
                    case Nothing: {
                        LOG.debug("Nothing command: {}", (Object)cmd);
                        break;
                    }
                    case Register: {
                        LOG.info("Register command: " + cmd);
                        this.mWorkerStorage.register();
                        break;
                    }
                    case Free: {
                        this.mWorkerStorage.freeBlocks(cmd.mData);
                        LOG.info("Free command: " + cmd);
                        break;
                    }
                    case Delete: {
                        LOG.info("Delete command: " + cmd);
                        break;
                    }
                    default: {
                        throw new RuntimeException("Un-recognized command from master " + cmd.toString());
                    }
                }
            }
            this.mWorkerStorage.checkStatus();
        }
    }

    public void start() throws IOException {
        this.login();
        this.mHeartbeatThread.start();
        LOG.info("The worker server started @ " + this.mWorkerAddress);
        this.mServer.serve();
        LOG.info("The worker server ends @ " + this.mWorkerAddress);
    }

    public void stop() throws IOException, InterruptedException {
        this.mStop = true;
        this.mWorkerStorage.stop();
        this.mDataServer.close();
        this.mServer.stop();
        this.mServerTNonblockingServerSocket.close();
        this.mExecutorService.shutdown();
        while (!this.mDataServer.isClosed() || this.mServer.isServing() || this.mHeartbeatThread.isAlive()) {
            this.mServer.stop();
            this.mServerTNonblockingServerSocket.close();
            CommonUtils.sleepMs(null, 100L);
        }
        this.mHeartbeatThread.join();
    }
}

