package org.apache.drill.exec.planner.fragment;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.Materializer;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/planner/fragment/SimpleParallelizer.class */
public class SimpleParallelizer {
    static final Logger logger = LoggerFactory.getLogger(SimpleParallelizer.class);
    private static final Ordering<EndpointAffinity> ENDPOINT_AFFINITY_ORDERING = Ordering.from(new Comparator<EndpointAffinity>() { // from class: org.apache.drill.exec.planner.fragment.SimpleParallelizer.1
        @Override // java.util.Comparator
        public int compare(EndpointAffinity endpointAffinity, EndpointAffinity endpointAffinity2) {
            return Double.compare(endpointAffinity2.getAffinity(), endpointAffinity.getAffinity());
        }
    });
    private final long parallelizationThreshold;
    private final int maxWidthPerNode;
    private final int maxGlobalWidth;
    private final double affinityFactor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/planner/fragment/SimpleParallelizer$CountRequiredFragments.class */
    public static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<BitControl.Collector>, RuntimeException> {
        private static final CountRequiredFragments INSTANCE = new CountRequiredFragments();

        private CountRequiredFragments() {
        }

        public static List<BitControl.Collector> getCollectors(PhysicalOperator physicalOperator) {
            ArrayList newArrayList = Lists.newArrayList();
            physicalOperator.accept(INSTANCE, newArrayList);
            return newArrayList;
        }

        @Override // org.apache.drill.exec.physical.base.AbstractPhysicalVisitor, org.apache.drill.exec.physical.base.PhysicalVisitor
        public Void visitReceiver(Receiver receiver, List<BitControl.Collector> list) throws RuntimeException {
            List<MinorFragmentEndpoint> providingEndpoints = receiver.getProvidingEndpoints();
            ArrayList arrayList = new ArrayList(providingEndpoints.size());
            Iterator<MinorFragmentEndpoint> it = providingEndpoints.iterator();
            while (it.hasNext()) {
                arrayList.add(Integer.valueOf(it.next().getId()));
            }
            list.add(BitControl.Collector.newBuilder().setIsSpooling(receiver.isSpooling()).setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId()).setSupportsOutOfOrder(receiver.supportsOutOfOrderExchange()).addAllIncomingMinorFragment(arrayList).build());
            return null;
        }

        @Override // org.apache.drill.exec.physical.base.AbstractPhysicalVisitor, org.apache.drill.exec.physical.base.PhysicalVisitor
        public Void visitOp(PhysicalOperator physicalOperator, List<BitControl.Collector> list) throws RuntimeException {
            Iterator it = physicalOperator.iterator();
            while (it.hasNext()) {
                ((PhysicalOperator) it.next()).accept(this, list);
            }
            return null;
        }
    }

    public SimpleParallelizer(QueryContext queryContext) {
        OptionManager options = queryContext.getOptions();
        long longValue = options.getOption(ExecConstants.SLICE_TARGET).num_val.longValue();
        this.parallelizationThreshold = longValue > 0 ? longValue : 1L;
        this.maxWidthPerNode = options.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val.intValue();
        this.maxGlobalWidth = options.getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue();
        this.affinityFactor = options.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue();
    }

    public SimpleParallelizer(long j, int i, int i2, double d) {
        this.parallelizationThreshold = j;
        this.maxWidthPerNode = i;
        this.maxGlobalWidth = i2;
        this.affinityFactor = d;
    }

    public QueryWorkUnit getFragments(OptionList optionList, CoordinationProtos.DrillbitEndpoint drillbitEndpoint, UserBitShared.QueryId queryId, Collection<CoordinationProtos.DrillbitEndpoint> collection, PhysicalPlanReader physicalPlanReader, Fragment fragment, UserSession userSession, BitControl.QueryContextInformation queryContextInformation) throws ExecutionSetupException {
        PlanningSet planningSet = new PlanningSet();
        initFragmentWrappers(fragment, planningSet);
        Iterator<Wrapper> it = constructFragmentDependencyGraph(planningSet).iterator();
        while (it.hasNext()) {
            parallelizeFragment(it.next(), planningSet, collection);
        }
        return generateWorkUnit(optionList, drillbitEndpoint, queryId, physicalPlanReader, fragment, planningSet, userSession, queryContextInformation);
    }

    @VisibleForTesting
    public void initFragmentWrappers(Fragment fragment, PlanningSet planningSet) {
        planningSet.get(fragment);
        Iterator<Fragment.ExchangeFragmentPair> it = fragment.iterator();
        while (it.hasNext()) {
            initFragmentWrappers(it.next().getNode(), planningSet);
        }
    }

    private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
        Iterator<Wrapper> it = planningSet.iterator();
        while (it.hasNext()) {
            Wrapper next = it.next();
            Fragment.ExchangeFragmentPair sendingExchangePair = next.getNode().getSendingExchangePair();
            if (sendingExchangePair != null) {
                Exchange.ParallelizationDependency parallelizationDependency = sendingExchangePair.getExchange().getParallelizationDependency();
                Wrapper wrapper = planningSet.get(sendingExchangePair.getNode());
                if (parallelizationDependency == Exchange.ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) {
                    wrapper.addFragmentDependency(next);
                } else if (parallelizationDependency == Exchange.ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) {
                    next.addFragmentDependency(wrapper);
                }
            }
        }
        HashSet newHashSet = Sets.newHashSet();
        Iterator<Wrapper> it2 = planningSet.iterator();
        while (it2.hasNext()) {
            newHashSet.add(it2.next());
        }
        Iterator<Wrapper> it3 = planningSet.iterator();
        while (it3.hasNext()) {
            List<Wrapper> fragmentDependencies = it3.next().getFragmentDependencies();
            if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
                for (Wrapper wrapper2 : fragmentDependencies) {
                    if (newHashSet.contains(wrapper2)) {
                        newHashSet.remove(wrapper2);
                    }
                }
            }
        }
        return newHashSet;
    }

    private void parallelizeFragment(Wrapper wrapper, PlanningSet planningSet, Collection<CoordinationProtos.DrillbitEndpoint> collection) throws PhysicalOperatorSetupException {
        if (wrapper.isEndpointsAssignmentDone()) {
            return;
        }
        List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
        if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
            Iterator<Wrapper> it = fragmentDependencies.iterator();
            while (it.hasNext()) {
                parallelizeFragment(it.next(), planningSet, collection);
            }
        }
        wrapper.getNode().getRoot().accept(new StatsCollector(planningSet), wrapper);
        Stats stats = wrapper.getStats();
        ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo();
        wrapper.setWidth(Math.max(1, Math.min(parallelizationInfo.getMaxWidth(), Math.max(parallelizationInfo.getMinWidth(), Math.min(Math.min((int) Math.ceil(stats.getMaxCost() / this.parallelizationThreshold), Math.min(parallelizationInfo.getMaxWidth(), this.maxGlobalWidth)), this.maxWidthPerNode * collection.size())))));
        wrapper.assignEndpoints(findEndpoints(collection, parallelizationInfo.getEndpointAffinityMap(), wrapper.getWidth()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r11v0 */
    /* JADX WARN: Type inference failed for: r11v1 */
    /* JADX WARN: Type inference failed for: r11v2, types: [java.util.List] */
    private List<CoordinationProtos.DrillbitEndpoint> findEndpoints(Collection<CoordinationProtos.DrillbitEndpoint> collection, Map<CoordinationProtos.DrillbitEndpoint, EndpointAffinity> map, int i) throws PhysicalOperatorSetupException {
        List newArrayList;
        ArrayList newArrayList2 = Lists.newArrayList();
        if (map.size() > 0) {
            ImmutableList immutableSortedCopy = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(map.values());
            int i2 = 0;
            Iterator it = immutableSortedCopy.iterator();
            while (it.hasNext() && ((EndpointAffinity) it.next()).isAssignmentRequired()) {
                i2++;
            }
            if (i < i2) {
                throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + i + ") is less than the number of mandatory nodes (" + i2 + " nodes with +INFINITE affinity).");
            }
            int min = Math.min(Math.max(Math.max(1, (int) ((this.affinityFactor * i) / collection.size())) * immutableSortedCopy.size(), i2), i);
            Iterator cycle = Iterators.cycle(immutableSortedCopy);
            while (newArrayList2.size() < min) {
                newArrayList2.add(((EndpointAffinity) cycle.next()).getEndpoint());
            }
        }
        if (newArrayList2.size() < i) {
            Set<CoordinationProtos.DrillbitEndpoint> keySet = map.keySet();
            if (map.size() > 0) {
                newArrayList = Lists.newArrayList();
                for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint : collection) {
                    if (!keySet.contains(drillbitEndpoint)) {
                        newArrayList.add(drillbitEndpoint);
                    }
                }
            } else {
                newArrayList = Lists.newArrayList(collection);
            }
            Collections.shuffle(newArrayList, ThreadLocalRandom.current());
            Iterator cycle2 = Iterators.cycle(newArrayList.size() > 0 ? newArrayList : keySet);
            while (newArrayList2.size() < i) {
                newArrayList2.add(cycle2.next());
            }
        }
        return newArrayList2;
    }

    private QueryWorkUnit generateWorkUnit(OptionList optionList, CoordinationProtos.DrillbitEndpoint drillbitEndpoint, UserBitShared.QueryId queryId, PhysicalPlanReader physicalPlanReader, Fragment fragment, PlanningSet planningSet, UserSession userSession, BitControl.QueryContextInformation queryContextInformation) throws ExecutionSetupException {
        ArrayList newArrayList = Lists.newArrayList();
        BitControl.PlanFragment planFragment = null;
        FragmentRoot fragmentRoot = null;
        Iterator<Wrapper> it = planningSet.iterator();
        while (it.hasNext()) {
            Wrapper next = it.next();
            Fragment node = next.getNode();
            PhysicalOperator root = node.getRoot();
            boolean z = fragment == node;
            if (z && next.getWidth() != 1) {
                throw new ForemanSetupException(String.format("Failure while trying to setup fragment. The root fragment must always have parallelization one. In the current case, the width was set to %d.", Integer.valueOf(next.getWidth())));
            }
            boolean z2 = node.getReceivingExchangePairs().size() == 0;
            for (int i = 0; i < next.getWidth(); i++) {
                Materializer.IndexedFragmentNode indexedFragmentNode = new Materializer.IndexedFragmentNode(i, next);
                next.resetAllocation();
                PhysicalOperator physicalOperator = (PhysicalOperator) root.accept(Materializer.INSTANCE, indexedFragmentNode);
                Preconditions.checkArgument(physicalOperator instanceof FragmentRoot);
                FragmentRoot fragmentRoot2 = (FragmentRoot) physicalOperator;
                try {
                    BitControl.PlanFragment build = BitControl.PlanFragment.newBuilder().setForeman(drillbitEndpoint).setFragmentJson(physicalPlanReader.writeJson(fragmentRoot2)).setHandle(ExecProtos.FragmentHandle.newBuilder().setMajorFragmentId(next.getMajorFragmentId()).setMinorFragmentId(i).setQueryId(queryId).build()).setAssignment(next.getAssignedEndpoint(i)).setLeafFragment(z2).setContext(queryContextInformation).setMemInitial(next.getInitialAllocation()).setMemMax(next.getMaxAllocation()).setOptionsJson(physicalPlanReader.writeJson(optionList)).setCredentials(userSession.getCredentials()).addAllCollector(CountRequiredFragments.getCollectors(fragmentRoot2)).build();
                    if (z) {
                        logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(build.toString()));
                        planFragment = build;
                        fragmentRoot = fragmentRoot2;
                    } else {
                        logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(build.toString()));
                        newArrayList.add(build);
                    }
                } catch (JsonProcessingException e) {
                    throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
                }
            }
        }
        return new QueryWorkUnit(fragmentRoot, planFragment, newArrayList);
    }
}
