/*
 * Decompiled with CFR 0.152.
 */
package oadd.org.apache.drill.exec.service;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import oadd.com.codahale.metrics.Gauge;
import oadd.com.codahale.metrics.MetricRegistry;
import oadd.com.google.common.base.Stopwatch;
import oadd.io.netty.channel.EventLoopGroup;
import oadd.org.apache.drill.common.AutoCloseables;
import oadd.org.apache.drill.common.config.DrillConfig;
import oadd.org.apache.drill.exec.exception.DrillbitStartupException;
import oadd.org.apache.drill.exec.memory.BufferAllocator;
import oadd.org.apache.drill.exec.metrics.DrillMetrics;
import oadd.org.apache.drill.exec.proto.CoordinationProtos;
import oadd.org.apache.drill.exec.rpc.TransportCheck;
import oadd.org.apache.drill.exec.rpc.control.Controller;
import oadd.org.apache.drill.exec.rpc.control.ControllerImpl;
import oadd.org.apache.drill.exec.rpc.control.WorkEventBus;
import oadd.org.apache.drill.exec.rpc.data.DataConnectionCreator;
import oadd.org.apache.drill.exec.rpc.user.UserServer;
import oadd.org.apache.drill.exec.server.BootStrapContext;
import oadd.org.apache.drill.exec.work.WorkManager;
import oadd.org.apache.drill.exec.work.batch.ControlMessageHandler;
import oadd.org.apache.drill.exec.work.user.UserWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceEngine
implements AutoCloseable {
    static final Logger logger = LoggerFactory.getLogger(ServiceEngine.class);
    private final UserServer userServer;
    private final Controller controller;
    private final DataConnectionCreator dataPool;
    private final DrillConfig config;
    boolean useIP = false;
    private final boolean allowPortHunting;
    private final BufferAllocator userAllocator;
    private final BufferAllocator controlAllocator;
    private final BufferAllocator dataAllocator;

    public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker userWorker, BootStrapContext context, WorkEventBus workBus, WorkManager.WorkerBee bee, boolean allowPortHunting) throws DrillbitStartupException {
        this.userAllocator = ServiceEngine.newAllocator(context, "rpc:user", "drill.exec.rpc.user.server.memory.reservation", "drill.exec.rpc.user.server.memory.maximum");
        this.controlAllocator = ServiceEngine.newAllocator(context, "rpc:bit-control", "drill.exec.rpc.bit.server.memory.control.reservation", "drill.exec.rpc.bit.server.memory.control.maximum");
        this.dataAllocator = ServiceEngine.newAllocator(context, "rpc:bit-data", "drill.exec.rpc.bit.server.memory.data.reservation", "drill.exec.rpc.bit.server.memory.data.maximum");
        EventLoopGroup eventLoopGroup = TransportCheck.createEventLoopGroup(context.getConfig().getInt("drill.exec.rpc.user.server.threads"), "UserServer-");
        this.userServer = new UserServer(context.getConfig(), context.getClasspathScan(), this.userAllocator, eventLoopGroup, userWorker, context.getExecutor());
        this.controller = new ControllerImpl(context, controlMessageHandler, this.controlAllocator, allowPortHunting);
        this.dataPool = new DataConnectionCreator(context, this.dataAllocator, workBus, bee, allowPortHunting);
        this.config = context.getConfig();
        this.allowPortHunting = allowPortHunting;
        this.registerMetrics(context.getMetrics());
    }

    private void registerMetrics(MetricRegistry registry) {
        String prefix = "drill.allocator.rpc.";
        DrillMetrics.register("drill.allocator.rpc.user.used", new Gauge<Long>(){

            @Override
            public Long getValue() {
                return ServiceEngine.this.userAllocator.getAllocatedMemory();
            }
        });
        DrillMetrics.register("drill.allocator.rpc.user.peak", new Gauge<Long>(){

            @Override
            public Long getValue() {
                return ServiceEngine.this.userAllocator.getPeakMemoryAllocation();
            }
        });
        DrillMetrics.register("drill.allocator.rpc.bit.control.used", new Gauge<Long>(){

            @Override
            public Long getValue() {
                return ServiceEngine.this.controlAllocator.getAllocatedMemory();
            }
        });
        DrillMetrics.register("drill.allocator.rpc.bit.control.peak", new Gauge<Long>(){

            @Override
            public Long getValue() {
                return ServiceEngine.this.controlAllocator.getPeakMemoryAllocation();
            }
        });
        DrillMetrics.register("drill.allocator.rpc.bit.data.used", new Gauge<Long>(){

            @Override
            public Long getValue() {
                return ServiceEngine.this.dataAllocator.getAllocatedMemory();
            }
        });
        DrillMetrics.register("drill.allocator.rpc.bit.data.peak", new Gauge<Long>(){

            @Override
            public Long getValue() {
                return ServiceEngine.this.dataAllocator.getPeakMemoryAllocation();
            }
        });
    }

    private static BufferAllocator newAllocator(BootStrapContext context, String name, String initReservation, String maxAllocation) {
        return context.getAllocator().newChildAllocator(name, context.getConfig().getLong(initReservation), context.getConfig().getLong(maxAllocation));
    }

    public CoordinationProtos.DrillbitEndpoint start() throws DrillbitStartupException, UnknownHostException {
        int userPort = this.userServer.bind(this.config.getInt("drill.exec.rpc.user.server.port"), this.allowPortHunting);
        String address = this.useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
        CoordinationProtos.DrillbitEndpoint partialEndpoint = CoordinationProtos.DrillbitEndpoint.newBuilder().setAddress(address).setUserPort(userPort).build();
        partialEndpoint = this.controller.start(partialEndpoint);
        return this.dataPool.start(partialEndpoint);
    }

    public DataConnectionCreator getDataConnectionCreator() {
        return this.dataPool;
    }

    public Controller getController() {
        return this.controller;
    }

    private void submit(Executor p, final String name, final AutoCloseable c) {
        p.execute(new Runnable(){

            @Override
            public void run() {
                Stopwatch watch = Stopwatch.createStarted();
                try {
                    c.close();
                }
                catch (Exception e) {
                    logger.warn("Failure while closing {}.", (Object)name, (Object)e);
                }
                long elapsed = watch.elapsed(TimeUnit.MILLISECONDS);
                if (elapsed > 500L) {
                    logger.info("closed " + name + " in " + elapsed + " ms");
                }
            }
        });
    }

    @Override
    public void close() throws Exception {
        ExecutorService p = Executors.newFixedThreadPool(2);
        this.submit(p, "userServer", this.userServer);
        this.submit(p, "dataPool", (AutoCloseable)this.dataPool);
        this.submit(p, "controller", (AutoCloseable)this.controller);
        p.shutdown();
        try {
            p.awaitTermination(3L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        AutoCloseables.close(this.userAllocator, this.controlAllocator, this.dataAllocator);
    }
}

