package org.apache.drill.exec.work;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.drill.common.SelfCleaningRunnable;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.rpc.data.DataResponseHandler;
import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/WorkManager.class */
public class WorkManager implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(WorkManager.class);
    private final BootStrapContext bContext;
    private DrillbitContext dContext;
    private final Executor executor;
    private static final int STATUS_PERIOD_SECONDS = 5;
    private final Map<ExecProtos.FragmentHandle, FragmentExecutor> runningFragments = new ConcurrentHashMap();
    private final ConcurrentMap<UserBitShared.QueryId, Foreman> queries = Maps.newConcurrentMap();
    private ExtendedLatch exitLatch = null;
    private final WorkerBee bee = new WorkerBee();
    private final WorkEventBus workBus = new WorkEventBus();
    private final ControlMessageHandler controlMessageWorker = new ControlMessageHandler(this.bee);
    private final UserWorker userWorker = new UserWorker(this.bee);
    private final StatusThread statusThread = new StatusThread();
    private final DataResponseHandler dataHandler = new DataResponseHandlerImpl(this.bee);

    /* loaded from: input_file:org/apache/drill/exec/work/WorkManager$StatusThread.class */
    private class StatusThread extends Thread {
        public StatusThread() {
            setDaemon(true);
            setName("WorkManager.StatusThread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                Controller controller = WorkManager.this.dContext.getController();
                ArrayList newArrayList = Lists.newArrayList();
                for (FragmentExecutor fragmentExecutor : WorkManager.this.runningFragments.values()) {
                    BitControl.FragmentStatus status = fragmentExecutor.getStatus();
                    if (status != null) {
                        newArrayList.add(controller.getTunnel(fragmentExecutor.getContext().getForemanEndpoint()).sendFragmentStatus(status));
                    }
                }
                Iterator it = newArrayList.iterator();
                while (it.hasNext()) {
                    try {
                        ((DrillRpcFuture) it.next()).checkedGet();
                    } catch (RpcException e) {
                        WorkManager.logger.info("Failure while sending intermediate fragment status to Foreman", e);
                    }
                }
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/work/WorkManager$WorkerBee.class */
    public class WorkerBee {
        public WorkerBee() {
        }

        public void addNewForeman(Foreman foreman) {
            WorkManager.this.queries.put(foreman.getQueryId(), foreman);
            WorkManager.this.executor.execute(foreman);
        }

        public void retireForeman(Foreman foreman) {
            Preconditions.checkNotNull(foreman);
            UserBitShared.QueryId queryId = foreman.getQueryId();
            if (!WorkManager.this.queries.remove(queryId, foreman)) {
                WorkManager.logger.warn("Couldn't find retiring Foreman for query " + queryId);
            }
            WorkManager.this.indicateIfSafeToExit();
        }

        public Foreman getForemanForQueryId(UserBitShared.QueryId queryId) {
            return (Foreman) WorkManager.this.queries.get(queryId);
        }

        public DrillbitContext getContext() {
            return WorkManager.this.dContext;
        }

        public void addFragmentRunner(FragmentExecutor fragmentExecutor) {
            final ExecProtos.FragmentHandle handle = fragmentExecutor.getContext().getHandle();
            WorkManager.this.runningFragments.put(handle, fragmentExecutor);
            WorkManager.this.executor.execute(new SelfCleaningRunnable(fragmentExecutor) { // from class: org.apache.drill.exec.work.WorkManager.WorkerBee.1
                protected void cleanup() {
                    WorkManager.this.runningFragments.remove(handle);
                    WorkManager.this.indicateIfSafeToExit();
                }
            });
        }

        public void startFragmentPendingRemote(FragmentManager fragmentManager) {
            final ExecProtos.FragmentHandle handle = fragmentManager.getHandle();
            FragmentExecutor runnable = fragmentManager.getRunnable();
            if (runnable == null) {
                return;
            }
            WorkManager.this.runningFragments.put(handle, runnable);
            WorkManager.this.executor.execute(new SelfCleaningRunnable(runnable) { // from class: org.apache.drill.exec.work.WorkManager.WorkerBee.2
                protected void cleanup() {
                    WorkManager.this.runningFragments.remove(handle);
                    WorkManager.this.workBus.removeFragmentManager(handle);
                    WorkManager.this.indicateIfSafeToExit();
                }
            });
        }

        public FragmentExecutor getFragmentRunner(ExecProtos.FragmentHandle fragmentHandle) {
            return (FragmentExecutor) WorkManager.this.runningFragments.get(fragmentHandle);
        }
    }

    public WorkManager(BootStrapContext bootStrapContext) {
        this.bContext = bootStrapContext;
        this.executor = bootStrapContext.getExecutor();
    }

    public void start(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, Controller controller, DataConnectionCreator dataConnectionCreator, ClusterCoordinator clusterCoordinator, PStoreProvider pStoreProvider) {
        this.dContext = new DrillbitContext(drillbitEndpoint, this.bContext, clusterCoordinator, controller, dataConnectionCreator, this.workBus, pStoreProvider);
        this.statusThread.start();
        try {
            this.dContext.getMetrics().register(MetricRegistry.name("drill.exec.work.running_fragments." + this.dContext.getEndpoint().getUserPort(), new String[0]), new Gauge<Integer>() { // from class: org.apache.drill.exec.work.WorkManager.1
                /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                public Integer m1284getValue() {
                    return Integer.valueOf(WorkManager.this.runningFragments.size());
                }
            });
        } catch (IllegalArgumentException e) {
            logger.warn("Exception while registering metrics", e);
        }
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public WorkEventBus getWorkBus() {
        return this.workBus;
    }

    public DataResponseHandler getDataHandler() {
        return this.dataHandler;
    }

    public ControlMessageHandler getControlMessageHandler() {
        return this.controlMessageWorker;
    }

    public UserWorker getUserWorker() {
        return this.userWorker;
    }

    public WorkerBee getBee() {
        return this.bee;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.statusThread.interrupt();
        if (this.runningFragments.isEmpty()) {
            return;
        }
        logger.warn("Closing WorkManager but there are {} running fragments.", Integer.valueOf(this.runningFragments.size()));
        if (logger.isDebugEnabled()) {
            for (ExecProtos.FragmentHandle fragmentHandle : this.runningFragments.keySet()) {
                logger.debug("Fragment still running: {} status: {}", QueryIdHelper.getQueryIdentifier(fragmentHandle), this.runningFragments.get(fragmentHandle).getStatus());
            }
        }
    }

    public DrillbitContext getContext() {
        return this.dContext;
    }

    public void waitToExit() {
        synchronized (this) {
            if (this.queries.isEmpty() && this.runningFragments.isEmpty()) {
                return;
            }
            this.exitLatch = new ExtendedLatch();
            this.exitLatch.awaitUninterruptibly(5000L);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void indicateIfSafeToExit() {
        synchronized (this) {
            if (this.exitLatch != null && this.queries.isEmpty() && this.runningFragments.isEmpty()) {
                this.exitLatch.countDown();
            }
        }
    }
}
