package org.apache.drill.exec.physical.impl.partitionsender;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.test.ClusterFixture;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.class */
public class TestPartitionSender extends PlanTestBase {
    private static final int NUM_DEPTS = 40;
    private static final int DRILLBITS_COUNT = 3;
    private static String empTableLocation;
    private static String groupByQuery;
    private static final int NUM_EMPLOYEES = 1000;
    private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(1, 6, NUM_EMPLOYEES, 1.2d);
    private static final UserSession USER_SESSION = UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build();
    public static TemporaryFolder testTempFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender$InjectExceptionTest.class */
    public static class InjectExceptionTest implements PartitionerDecorator.GeneralExecuteIface {
        private InjectExceptionTest() {
        }

        public void execute(Partitioner partitioner) throws IOException {
            partitioner.getStats().addLongStat(PartitionSenderRootExec.Metric.BYTES_SENT, 5L);
            throw new IOException("Test exception handling");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender$MockPartitionSenderRootExec.class */
    public static class MockPartitionSenderRootExec extends PartitionSenderRootExec {
        public MockPartitionSenderRootExec(FragmentContext fragmentContext, RecordBatch recordBatch, HashPartitionSender hashPartitionSender) throws OutOfMemoryException {
            super(fragmentContext, recordBatch, hashPartitionSender);
        }

        public void close() {
        }

        public int getNumberPartitions() {
            return this.numberPartitions;
        }

        public OperatorStats getStats() {
            return this.stats;
        }
    }

    @BeforeClass
    public static void setupTempFolder() throws IOException {
        testTempFolder.create();
    }

    @BeforeClass
    public static void generateTestDataAndQueries() throws Exception {
        empTableLocation = testTempFolder.newFolder().getAbsolutePath();
        for (int i = 0; i < 10; i++) {
            PrintWriter printWriter = new PrintWriter(new File(empTableLocation + File.separator + i + ".json"));
            for (int i2 = i * 100; i2 < (i + 1) * 100; i2++) {
                printWriter.println(String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d }", Integer.valueOf(i2), Integer.valueOf(i2), Integer.valueOf(i2 % NUM_DEPTS)));
            }
            printWriter.close();
        }
        groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees FROM dfs.`%s` GROUP BY dept_id", empTableLocation);
    }

    @AfterClass
    public static void cleanupTempFolder() throws IOException {
        testTempFolder.delete();
    }

    @Test
    public void testPartitionSenderCostToThreads() throws Exception {
        VectorContainer vectorContainer = new VectorContainer();
        vectorContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
        SelectionVector4 selectionVector4 = (SelectionVector4) Mockito.mock(SelectionVector4.class, "SelectionVector4");
        Mockito.when(Integer.valueOf(selectionVector4.getCount())).thenReturn(100);
        Mockito.when(Integer.valueOf(selectionVector4.getTotalCount())).thenReturn(100);
        for (int i = 0; i < 100; i++) {
            Mockito.when(Integer.valueOf(selectionVector4.get(i))).thenReturn(Integer.valueOf(i));
        }
        TopNBatch.SimpleRecordBatch simpleRecordBatch = new TopNBatch.SimpleRecordBatch(vectorContainer, selectionVector4, (FragmentContext) null);
        updateTestCluster(DRILLBITS_COUNT, null);
        test("ALTER SESSION SET `planner.slice_target`=1");
        String planInString = getPlanInString("EXPLAIN PLAN FOR " + groupByQuery, ClusterFixture.EXPLAIN_PLAN_JSON);
        System.out.println("Plan: " + planInString);
        DrillbitContext drillbitContext = getDrillbitContext();
        PhysicalPlanReader planReader = drillbitContext.getPlanReader();
        PhysicalPlan readPhysicalPlan = planReader.readPhysicalPlan(planInString);
        Fragment rootFragmentFromPlanString = PopUnitTestBase.getRootFragmentFromPlanString(planReader, planInString);
        PlanningSet planningSet = new PlanningSet();
        FunctionImplementationRegistry functionImplementationRegistry = new FunctionImplementationRegistry(config);
        PARALLELIZER.initFragmentWrappers(rootFragmentFromPlanString, planningSet);
        HashToRandomExchange hashToRandomExchange = null;
        Iterator it = readPhysicalPlan.getSortedOperators(false).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            PhysicalOperator physicalOperator = (PhysicalOperator) it.next();
            if (physicalOperator instanceof HashToRandomExchange) {
                hashToRandomExchange = (HashToRandomExchange) physicalOperator;
                break;
            }
        }
        OptionList optionList = new OptionList();
        optionList.add(OptionValue.createLong(OptionValue.OptionType.SESSION, "planner.slice_target", 1L));
        testThreadsHelper(hashToRandomExchange, drillbitContext, optionList, simpleRecordBatch, functionImplementationRegistry, planReader, planningSet, rootFragmentFromPlanString, 1);
        optionList.clear();
        optionList.add(OptionValue.createLong(OptionValue.OptionType.SESSION, "planner.slice_target", 1L));
        optionList.add(OptionValue.createLong(OptionValue.OptionType.SESSION, "planner.partitioner_sender_max_threads", 10L));
        hashToRandomExchange.setCost(1000.0d);
        testThreadsHelper(hashToRandomExchange, drillbitContext, optionList, simpleRecordBatch, functionImplementationRegistry, planReader, planningSet, rootFragmentFromPlanString, 10);
        optionList.clear();
        optionList.add(OptionValue.createLong(OptionValue.OptionType.SESSION, "planner.slice_target", 1000L));
        optionList.add(OptionValue.createLong(OptionValue.OptionType.SESSION, "planner.partitioner_sender_threads_factor", 2L));
        hashToRandomExchange.setCost(14000.0d);
        testThreadsHelper(hashToRandomExchange, drillbitContext, optionList, simpleRecordBatch, functionImplementationRegistry, planReader, planningSet, rootFragmentFromPlanString, 2);
    }

    private void testThreadsHelper(HashToRandomExchange hashToRandomExchange, DrillbitContext drillbitContext, OptionList optionList, RecordBatch recordBatch, FunctionImplementationRegistry functionImplementationRegistry, PhysicalPlanReader physicalPlanReader, PlanningSet planningSet, Fragment fragment, int i) throws Exception {
        QueryWorkUnit fragments = PARALLELIZER.getFragments(optionList, drillbitContext.getEndpoint(), UserBitShared.QueryId.getDefaultInstance(), drillbitContext.getBits(), physicalPlanReader, fragment, USER_SESSION, Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"));
        List indexOrderedEndpoints = PhysicalOperatorUtil.getIndexOrderedEndpoints(Lists.newArrayList(drillbitContext.getBits()));
        for (BitControl.PlanFragment planFragment : fragments.getFragments()) {
            if (planFragment.getFragmentJson().contains("hash-partition-sender")) {
                MockPartitionSenderRootExec mockPartitionSenderRootExec = null;
                FragmentContext fragmentContext = null;
                try {
                    fragmentContext = new FragmentContext(drillbitContext, planFragment, (UserServer.UserClientConnection) null, functionImplementationRegistry);
                    mockPartitionSenderRootExec = new MockPartitionSenderRootExec(fragmentContext, recordBatch, new HashPartitionSender(planFragment.getHandle().getMajorFragmentId(), hashToRandomExchange, hashToRandomExchange.getExpression(), indexOrderedEndpoints));
                    Assert.assertEquals("Number of threads calculated", i, mockPartitionSenderRootExec.getNumberPartitions());
                    mockPartitionSenderRootExec.createPartitioner();
                    PartitionerDecorator partitioner = mockPartitionSenderRootExec.getPartitioner();
                    Assert.assertNotNull(partitioner);
                    List partitioners = partitioner.getPartitioners();
                    Assert.assertNotNull(partitioners);
                    int i2 = DRILLBITS_COUNT > i ? i : DRILLBITS_COUNT;
                    Assert.assertEquals("Number of partitioners", i2, partitioners.size());
                    for (int i3 = 0; i3 < indexOrderedEndpoints.size(); i3++) {
                        Assert.assertNotNull("PartitionOutgoingBatch", partitioner.getOutgoingBatches(i3));
                    }
                    boolean z = true;
                    int i4 = 0;
                    Iterator it = partitioners.iterator();
                    while (it.hasNext()) {
                        int size = ((Partitioner) it.next()).getOutgoingBatches().size();
                        if (z) {
                            z = false;
                        } else {
                            Assert.assertTrue(Math.abs(size - i4) <= 1);
                        }
                        i4 = size;
                    }
                    mockPartitionSenderRootExec.getStats().startProcessing();
                    try {
                        partitioner.partitionBatch(recordBatch);
                        mockPartitionSenderRootExec.getStats().stopProcessing();
                        if (i2 == 1) {
                            Assert.assertEquals("With single thread parent and child waitNanos should match", ((Partitioner) partitioners.get(0)).getStats().getWaitNanos(), mockPartitionSenderRootExec.getStats().getWaitNanos());
                        }
                        boolean z2 = true;
                        Iterator it2 = partitioner.getPartitioners().iterator();
                        while (it2.hasNext()) {
                            Iterator it3 = ((Partitioner) it2.next()).getOutgoingBatches().iterator();
                            while (it3.hasNext()) {
                                int recordCount = ((PartitionOutgoingBatch) it3.next()).getRecordCount();
                                if (z2) {
                                    Assert.assertEquals("RecordCount", 100L, recordCount);
                                    z2 = false;
                                } else {
                                    Assert.assertEquals("RecordCount", 0L, recordCount);
                                }
                            }
                        }
                        mockPartitionSenderRootExec.getStats().startProcessing();
                        try {
                            try {
                                partitioner.executeMethodLogic(new InjectExceptionTest());
                                Assert.fail("Should throw IOException here");
                                mockPartitionSenderRootExec.getStats().stopProcessing();
                            } catch (IOException e) {
                                UserBitShared.OperatorProfile.Builder newBuilder = UserBitShared.OperatorProfile.newBuilder();
                                mockPartitionSenderRootExec.getStats().addAllMetrics(newBuilder);
                                for (UserBitShared.MetricValue metricValue : newBuilder.getMetricList()) {
                                    if (PartitionSenderRootExec.Metric.BYTES_SENT.metricId() == metricValue.getMetricId()) {
                                        Assert.assertEquals("Should add metricValue irrespective of exception", 5 * i2, metricValue.getLongValue());
                                    }
                                    if (PartitionSenderRootExec.Metric.SENDING_THREADS_COUNT.metricId() == metricValue.getMetricId()) {
                                        Assert.assertEquals(i2, metricValue.getLongValue());
                                    }
                                }
                                Assert.assertEquals(i2 - 1, e.getSuppressed().length);
                                mockPartitionSenderRootExec.getStats().stopProcessing();
                            }
                            mockPartitionSenderRootExec.close();
                            fragmentContext.close();
                        } finally {
                        }
                    } finally {
                    }
                } catch (Throwable th) {
                    mockPartitionSenderRootExec.close();
                    fragmentContext.close();
                    throw th;
                }
            }
        }
    }

    @Test
    public void testAlgorithm() throws Exception {
        Random random = new Random();
        for (int i = 0; i < NUM_EMPLOYEES; i++) {
            int nextInt = random.nextInt(NUM_EMPLOYEES) + 1;
            int nextInt2 = random.nextInt(32) + 1;
            int i2 = nextInt > nextInt2 ? nextInt2 : nextInt;
            int max = Math.max(1, nextInt / i2);
            int i3 = nextInt % i2;
            int i4 = 0;
            for (int i5 = 0; i5 < i2; i5++) {
                i4 += max;
                if (i5 < i3) {
                    i4++;
                }
            }
            Assert.assertTrue("endIndex can not be > outGoingBatchCount", i4 == nextInt);
        }
    }
}
