/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.partitionsender;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import java.util.Random;
import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionOutgoingBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.Partitioner;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

public class TestPartitionSender
extends PlanTestBase {
    private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(1L, 6, 1000, 1.2);
    private static final UserSession USER_SESSION = UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build();
    public static TemporaryFolder testTempFolder = new TemporaryFolder();
    private static final int NUM_DEPTS = 40;
    private static final int NUM_EMPLOYEES = 1000;
    private static final int DRILLBITS_COUNT = 3;
    private static String empTableLocation;
    private static String groupByQuery;

    @BeforeClass
    public static void setupTempFolder() throws IOException {
        testTempFolder.create();
    }

    @BeforeClass
    public static void generateTestDataAndQueries() throws Exception {
        empTableLocation = testTempFolder.newFolder().getAbsolutePath();
        int empNumRecsPerFile = 100;
        for (int fileIndex = 0; fileIndex < 10; ++fileIndex) {
            File file = new File(empTableLocation + File.separator + fileIndex + ".json");
            PrintWriter printWriter = new PrintWriter(file);
            for (int recordIndex = fileIndex * 100; recordIndex < (fileIndex + 1) * 100; ++recordIndex) {
                String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d }", recordIndex, recordIndex, recordIndex % 40);
                printWriter.println(record);
            }
            printWriter.close();
        }
        groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees FROM dfs.`%s` GROUP BY dept_id", empTableLocation);
    }

    @AfterClass
    public static void cleanupTempFolder() throws IOException {
        testTempFolder.delete();
    }

    @Test
    public void testPartitionSenderCostToThreads() throws Exception {
        VectorContainer container = new VectorContainer();
        container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
        SelectionVector4 sv = (SelectionVector4)Mockito.mock(SelectionVector4.class, (String)"SelectionVector4");
        Mockito.when((Object)sv.getCount()).thenReturn((Object)100);
        Mockito.when((Object)sv.getTotalCount()).thenReturn((Object)100);
        for (int i = 0; i < 100; ++i) {
            Mockito.when((Object)sv.get(i)).thenReturn((Object)i);
        }
        TopNBatch.SimpleRecordBatch incoming = new TopNBatch.SimpleRecordBatch(container, sv, null);
        TestPartitionSender.updateTestCluster(3, null);
        TestPartitionSender.test("ALTER SESSION SET `planner.slice_target`=1");
        String plan = TestPartitionSender.getPlanInString("EXPLAIN PLAN FOR " + groupByQuery, "json");
        System.out.println("Plan: " + plan);
        DrillbitContext drillbitContext = TestPartitionSender.getDrillbitContext();
        PhysicalPlanReader planReader = drillbitContext.getPlanReader();
        PhysicalPlan physicalPlan = planReader.readPhysicalPlan(plan);
        Fragment rootFragment = PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);
        PlanningSet planningSet = new PlanningSet();
        FunctionImplementationRegistry registry = new FunctionImplementationRegistry(config);
        PARALLELIZER.initFragmentWrappers(rootFragment, planningSet);
        List operators = physicalPlan.getSortedOperators(false);
        HashToRandomExchange hashToRandomExchange = null;
        for (PhysicalOperator operator : operators) {
            if (!(operator instanceof HashToRandomExchange)) continue;
            hashToRandomExchange = (HashToRandomExchange)operator;
            break;
        }
        OptionList options = new OptionList();
        options.add((Object)OptionValue.createLong((OptionValue.OptionType)OptionValue.OptionType.SESSION, (String)"planner.slice_target", (long)1L));
        this.testThreadsHelper(hashToRandomExchange, drillbitContext, options, (RecordBatch)incoming, registry, planReader, planningSet, rootFragment, 1);
        options.clear();
        options.add((Object)OptionValue.createLong((OptionValue.OptionType)OptionValue.OptionType.SESSION, (String)"planner.slice_target", (long)1L));
        options.add((Object)OptionValue.createLong((OptionValue.OptionType)OptionValue.OptionType.SESSION, (String)"planner.partitioner_sender_max_threads", (long)10L));
        hashToRandomExchange.setCost(1000.0);
        this.testThreadsHelper(hashToRandomExchange, drillbitContext, options, (RecordBatch)incoming, registry, planReader, planningSet, rootFragment, 10);
        options.clear();
        options.add((Object)OptionValue.createLong((OptionValue.OptionType)OptionValue.OptionType.SESSION, (String)"planner.slice_target", (long)1000L));
        options.add((Object)OptionValue.createLong((OptionValue.OptionType)OptionValue.OptionType.SESSION, (String)"planner.partitioner_sender_threads_factor", (long)2L));
        hashToRandomExchange.setCost(14000.0);
        this.testThreadsHelper(hashToRandomExchange, drillbitContext, options, (RecordBatch)incoming, registry, planReader, planningSet, rootFragment, 2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testThreadsHelper(HashToRandomExchange hashToRandomExchange, DrillbitContext drillbitContext, OptionList options, RecordBatch incoming, FunctionImplementationRegistry registry, PhysicalPlanReader planReader, PlanningSet planningSet, Fragment rootFragment, int expectedThreadsCount) throws Exception {
        BitControl.QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo((String)"dummySchemaName");
        QueryWorkUnit qwu = PARALLELIZER.getFragments(options, drillbitContext.getEndpoint(), UserBitShared.QueryId.getDefaultInstance(), drillbitContext.getBits(), planReader, rootFragment, USER_SESSION, queryContextInfo);
        List mfEndPoints = PhysicalOperatorUtil.getIndexOrderedEndpoints((List)Lists.newArrayList((Iterable)drillbitContext.getBits()));
        for (BitControl.PlanFragment planFragment : qwu.getFragments()) {
            if (!planFragment.getFragmentJson().contains("hash-partition-sender")) continue;
            MockPartitionSenderRootExec partionSenderRootExec = null;
            FragmentContext context = null;
            try {
                context = new FragmentContext(drillbitContext, planFragment, null, registry);
                int majorFragmentId = planFragment.getHandle().getMajorFragmentId();
                HashPartitionSender partSender = new HashPartitionSender(majorFragmentId, (PhysicalOperator)hashToRandomExchange, hashToRandomExchange.getExpression(), mfEndPoints);
                partionSenderRootExec = new MockPartitionSenderRootExec(context, incoming, partSender);
                Assert.assertEquals((String)"Number of threads calculated", (long)expectedThreadsCount, (long)partionSenderRootExec.getNumberPartitions());
                partionSenderRootExec.createPartitioner();
                PartitionerDecorator partDecor = partionSenderRootExec.getPartitioner();
                Assert.assertNotNull((Object)partDecor);
                List partitioners = partDecor.getPartitioners();
                Assert.assertNotNull((Object)partitioners);
                int actualThreads = 3 > expectedThreadsCount ? expectedThreadsCount : 3;
                Assert.assertEquals((String)"Number of partitioners", (long)actualThreads, (long)partitioners.size());
                for (int i = 0; i < mfEndPoints.size(); ++i) {
                    Assert.assertNotNull((String)"PartitionOutgoingBatch", (Object)partDecor.getOutgoingBatches(i));
                }
                boolean isFirst = true;
                int prevBatchCountSize = 0;
                int batchCountSize = 0;
                for (Partitioner part : partitioners) {
                    List outBatch = part.getOutgoingBatches();
                    batchCountSize = outBatch.size();
                    if (!isFirst) {
                        Assert.assertTrue((Math.abs(batchCountSize - prevBatchCountSize) <= 1 ? 1 : 0) != 0);
                    } else {
                        isFirst = false;
                    }
                    prevBatchCountSize = batchCountSize;
                }
                partionSenderRootExec.getStats().startProcessing();
                try {
                    partDecor.partitionBatch(incoming);
                }
                finally {
                    partionSenderRootExec.getStats().stopProcessing();
                }
                if (actualThreads == 1) {
                    Assert.assertEquals((String)"With single thread parent and child waitNanos should match", (long)((Partitioner)partitioners.get(0)).getStats().getWaitNanos(), (long)partionSenderRootExec.getStats().getWaitNanos());
                }
                partitioners = partDecor.getPartitioners();
                isFirst = true;
                for (Partitioner part : partitioners) {
                    List outBatches = part.getOutgoingBatches();
                    for (PartitionOutgoingBatch partOutBatch : outBatches) {
                        int recordCount = ((VectorAccessible)partOutBatch).getRecordCount();
                        if (isFirst) {
                            Assert.assertEquals((String)"RecordCount", (long)100L, (long)recordCount);
                            isFirst = false;
                            continue;
                        }
                        Assert.assertEquals((String)"RecordCount", (long)0L, (long)recordCount);
                    }
                }
                partionSenderRootExec.getStats().startProcessing();
                try {
                    partDecor.executeMethodLogic((PartitionerDecorator.GeneralExecuteIface)new InjectExceptionTest());
                    Assert.fail((String)"Should throw IOException here");
                }
                catch (IOException ioe) {
                    UserBitShared.OperatorProfile.Builder oPBuilder = UserBitShared.OperatorProfile.newBuilder();
                    partionSenderRootExec.getStats().addAllMetrics(oPBuilder);
                    List metrics = oPBuilder.getMetricList();
                    for (UserBitShared.MetricValue metric : metrics) {
                        if (PartitionSenderRootExec.Metric.BYTES_SENT.metricId() == metric.getMetricId()) {
                            Assert.assertEquals((String)"Should add metricValue irrespective of exception", (long)(5 * actualThreads), (long)metric.getLongValue());
                        }
                        if (PartitionSenderRootExec.Metric.SENDING_THREADS_COUNT.metricId() != metric.getMetricId()) continue;
                        Assert.assertEquals((long)actualThreads, (long)metric.getLongValue());
                    }
                    Assert.assertEquals((long)(actualThreads - 1), (long)ioe.getSuppressed().length);
                }
                finally {
                    partionSenderRootExec.getStats().stopProcessing();
                }
            }
            finally {
                partionSenderRootExec.close();
                context.close();
            }
        }
    }

    @Test
    public void testAlgorithm() throws Exception {
        Random rand = new Random();
        for (int k = 0; k < 1000; ++k) {
            int numberPartitions;
            int outGoingBatchCount = rand.nextInt(1000) + 1;
            int actualPartitions = outGoingBatchCount > (numberPartitions = rand.nextInt(32) + 1) ? numberPartitions : outGoingBatchCount;
            int divisor = Math.max(1, outGoingBatchCount / actualPartitions);
            int longTail = outGoingBatchCount % actualPartitions;
            int startIndex = 0;
            int endIndex = 0;
            for (int i = 0; i < actualPartitions; ++i) {
                startIndex = endIndex;
                endIndex = startIndex + divisor;
                if (i >= longTail) continue;
                ++endIndex;
            }
            Assert.assertTrue((String)"endIndex can not be > outGoingBatchCount", (endIndex == outGoingBatchCount ? 1 : 0) != 0);
        }
    }

    private static class InjectExceptionTest
    implements PartitionerDecorator.GeneralExecuteIface {
        private InjectExceptionTest() {
        }

        public void execute(Partitioner partitioner) throws IOException {
            partitioner.getStats().addLongStat((MetricDef)PartitionSenderRootExec.Metric.BYTES_SENT, 5L);
            throw new IOException("Test exception handling");
        }
    }

    private static class MockPartitionSenderRootExec
    extends PartitionSenderRootExec {
        public MockPartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException {
            super(context, incoming, operator);
        }

        public void close() throws Exception {
            ((AutoCloseable)this.oContext).close();
        }

        public int getNumberPartitions() {
            return this.numberPartitions;
        }

        public OperatorStats getStats() {
            return this.stats;
        }
    }
}

