/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.PlanTestBase;
import org.apache.drill.TestBuilder;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.physical.config.UnorderedDeMuxExchange;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestLocalExchange
extends PlanTestBase {
    public static TemporaryFolder testTempFolder = new TemporaryFolder();
    private static final int CLUSTER_SIZE = 3;
    private static final String MUX_EXCHANGE = "\"unordered-mux-exchange\"";
    private static final String DEMUX_EXCHANGE = "\"unordered-demux-exchange\"";
    private static final String MUX_EXCHANGE_CONST = "unordered-mux-exchange";
    private static final String DEMUX_EXCHANGE_CONST = "unordered-demux-exchange";
    private static final String HASH_EXCHANGE = "hash-to-random-exchange";
    private static final UserSession USER_SESSION = UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build();
    private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(1L, 6, 1000, 1.2);
    private static final int NUM_DEPTS = 40;
    private static final int NUM_EMPLOYEES = 1000;
    private static final int NUM_MNGRS = 1;
    private static final int NUM_IDS = 1;
    private static String empTableLocation;
    private static String deptTableLocation;
    private static String groupByQuery;
    private static String joinQuery;
    private static String[] joinQueryBaselineColumns;
    private static String[] groupByQueryBaselineColumns;
    private static List<Object[]> groupByQueryBaselineValues;
    private static List<Object[]> joinQueryBaselineValues;

    @BeforeClass
    public static void setupClusterSize() {
        TestLocalExchange.updateTestCluster(3, null);
    }

    @BeforeClass
    public static void setupTempFolder() throws IOException {
        testTempFolder.create();
    }

    @BeforeClass
    public static void generateTestDataAndQueries() throws Exception {
        int i;
        empTableLocation = testTempFolder.newFolder().getAbsolutePath();
        int empNumRecsPerFile = 100;
        for (int fileIndex = 0; fileIndex < 10; ++fileIndex) {
            File file = new File(empTableLocation + File.separator + fileIndex + ".json");
            PrintWriter printWriter = new PrintWriter(file);
            for (int recordIndex = fileIndex * 100; recordIndex < (fileIndex + 1) * 100; ++recordIndex) {
                String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d, \"mng_id\" : %d, \"some_id\" : %d }", recordIndex, recordIndex, recordIndex % 40, recordIndex % 1, recordIndex % 1);
                printWriter.println(record);
            }
            printWriter.close();
        }
        deptTableLocation = testTempFolder.newFolder().getAbsolutePath();
        int deptNumRecsPerFile = 4;
        for (int fileIndex = 0; fileIndex < 10; ++fileIndex) {
            File file = new File(deptTableLocation + File.separator + fileIndex + ".json");
            PrintWriter printWriter = new PrintWriter(file);
            for (int recordIndex = fileIndex * 4; recordIndex < (fileIndex + 1) * 4; ++recordIndex) {
                String record = String.format("{ \"dept_id\" : %d, \"dept_name\" : \"Department %d\" }", recordIndex, recordIndex);
                printWriter.println(record);
            }
            printWriter.close();
        }
        groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees FROM dfs.`%s` GROUP BY dept_id", empTableLocation);
        joinQuery = String.format("SELECT e.emp_name, d.dept_name FROM dfs.`%s` e JOIN dfs.`%s` d ON e.dept_id = d.dept_id", empTableLocation, deptTableLocation);
        groupByQueryBaselineColumns = new String[]{"dept_id", "numEmployees"};
        groupByQueryBaselineValues = Lists.newArrayList();
        int numOccurrances = 25;
        for (i = 0; i < 40; ++i) {
            groupByQueryBaselineValues.add(new Object[]{(long)i, 25L});
        }
        joinQueryBaselineColumns = new String[]{"emp_name", "dept_name"};
        joinQueryBaselineValues = Lists.newArrayList();
        for (i = 0; i < 1000; ++i) {
            String employee = String.format("Employee %d", i);
            String dept = String.format("Department %d", i % 40);
            joinQueryBaselineValues.add(new String[]{employee, dept});
        }
    }

    public static void setupHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
        TestLocalExchange.test("ALTER SESSION SET `planner.slice_target`=1");
        TestLocalExchange.test("ALTER SESSION SET `planner.enable_broadcast_join`=false");
        TestLocalExchange.test("ALTER SESSION SET `planner.enable_mux_exchange`=" + isMuxOn);
        TestLocalExchange.test("ALTER SESSION SET `planner.enable_demux_exchange`=" + isDeMuxOn);
    }

    @Test
    public void testGroupByMultiFields() throws Exception {
        TestLocalExchange.test("ALTER SESSION SET `planner.slice_target`=1");
        TestLocalExchange.test("ALTER SESSION SET `planner.enable_mux_exchange`=true");
        TestLocalExchange.test("ALTER SESSION SET `planner.enable_demux_exchange`=false");
        String groupByMultipleQuery = String.format("SELECT dept_id, mng_id, some_id, count(*) as numEmployees FROM dfs.`%s` e GROUP BY dept_id, mng_id, some_id", empTableLocation);
        String[] groupByMultipleQueryBaselineColumns = new String[]{"dept_id", "mng_id", "some_id", "numEmployees"};
        int numOccurrances = 25;
        String plan = TestLocalExchange.getPlanInString("EXPLAIN PLAN FOR " + groupByMultipleQuery, "json");
        System.out.println("Plan: " + plan);
        TestLocalExchange.jsonExchangeOrderChecker(plan, false, 1, "castint\\(hash64asdouble\\(.*, hash64asdouble\\(.*, hash64asdouble\\(.*\\) \\) \\) \\) ");
        TestBuilder testBuilder = TestLocalExchange.testBuilder().sqlQuery(groupByMultipleQuery).unOrdered().baselineColumns(groupByMultipleQueryBaselineColumns);
        for (int i = 0; i < 40; ++i) {
            testBuilder.baselineValues(i, 0L, 0L, 25L);
        }
        testBuilder.go();
    }

    @Test
    public void testGroupBy_NoMux_NoDeMux() throws Exception {
        TestLocalExchange.testGroupByHelper(false, false);
    }

    @Test
    public void testJoin_NoMux_NoDeMux() throws Exception {
        TestLocalExchange.testJoinHelper(false, false);
    }

    @Test
    public void testGroupBy_Mux_NoDeMux() throws Exception {
        TestLocalExchange.testGroupByHelper(true, false);
    }

    @Test
    public void testJoin_Mux_NoDeMux() throws Exception {
        TestLocalExchange.testJoinHelper(true, false);
    }

    @Test
    public void testGroupBy_NoMux_DeMux() throws Exception {
        TestLocalExchange.testGroupByHelper(false, true);
    }

    @Test
    public void testJoin_NoMux_DeMux() throws Exception {
        TestLocalExchange.testJoinHelper(false, true);
    }

    @Test
    public void testGroupBy_Mux_DeMux() throws Exception {
        TestLocalExchange.testGroupByHelper(true, true);
    }

    @Test
    public void testJoin_Mux_DeMux() throws Exception {
        TestLocalExchange.testJoinHelper(true, true);
    }

    private static void testGroupByHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
        TestLocalExchange.testHelper(isMuxOn, isDeMuxOn, groupByQuery, isMuxOn ? 1 : 0, isDeMuxOn ? 1 : 0, groupByQueryBaselineColumns, groupByQueryBaselineValues);
    }

    public static void testJoinHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
        TestLocalExchange.testHelper(isMuxOn, isDeMuxOn, joinQuery, isMuxOn ? 2 : 0, isDeMuxOn ? 2 : 0, joinQueryBaselineColumns, joinQueryBaselineValues);
    }

    private static void testHelper(boolean isMuxOn, boolean isDeMuxOn, String query, int expectedNumMuxes, int expectedNumDeMuxes, String[] baselineColumns, List<Object[]> baselineValues) throws Exception {
        TestLocalExchange.setupHelper(isMuxOn, isDeMuxOn);
        String plan = TestLocalExchange.getPlanInString("EXPLAIN PLAN FOR " + query, "json");
        System.out.println("Plan: " + plan);
        if (isMuxOn) {
            Assert.assertEquals((String)"HashExpr on the hash column should not happen", (long)(2 * expectedNumMuxes + expectedNumDeMuxes), (long)StringUtils.countMatches((CharSequence)plan, (CharSequence)"E_X_P_R_H_A_S_H_F_I_E_L_D"));
            TestLocalExchange.jsonExchangeOrderChecker(plan, isDeMuxOn, expectedNumMuxes, "castint\\(hash64asdouble\\(.*\\) \\) ");
        } else {
            Assert.assertEquals((String)"HashExpr on the hash column should not happen", (long)0L, (long)StringUtils.countMatches((CharSequence)plan, (CharSequence)"E_X_P_R_H_A_S_H_F_I_E_L_D"));
        }
        Assert.assertEquals((String)"Wrong number of MuxExchanges are present in the plan", (long)expectedNumMuxes, (long)StringUtils.countMatches((CharSequence)plan, (CharSequence)MUX_EXCHANGE));
        Assert.assertEquals((String)"Wrong number of DeMuxExchanges are present in the plan", (long)expectedNumDeMuxes, (long)StringUtils.countMatches((CharSequence)plan, (CharSequence)DEMUX_EXCHANGE));
        TestBuilder testBuilder = TestLocalExchange.testBuilder().sqlQuery(query).unOrdered().baselineColumns(baselineColumns);
        for (Object[] baselineRecord : baselineValues) {
            testBuilder.baselineValues(baselineRecord);
        }
        testBuilder.go();
        TestLocalExchange.testHelperVerifyPartitionSenderParallelization(plan, isMuxOn, isDeMuxOn);
    }

    private static void jsonExchangeOrderChecker(String plan, boolean isDemuxEnabled, int expectedNumMuxes, String hashExprPattern) throws Exception {
        JSONObject planObj = (JSONObject)new JSONParser().parse(plan);
        Assert.assertNotNull((String)"Corrupted query plan: null", (Object)planObj);
        JSONArray graphArray = (JSONArray)planObj.get((Object)"graph");
        Assert.assertNotNull((String)"No graph array present", (Object)graphArray);
        int i = 0;
        int k = 0;
        int prevExprsArraySize = 0;
        boolean foundExpr = false;
        int muxesCount = 0;
        for (Object object : graphArray) {
            JSONArray exprsArray;
            JSONObject popObj = (JSONObject)object;
            if (popObj.containsKey((Object)"pop") && popObj.get((Object)"pop").equals("project") && popObj.containsKey((Object)"exprs")) {
                exprsArray = (JSONArray)popObj.get((Object)"exprs");
                for (Object exprObj : exprsArray) {
                    JSONObject expr = (JSONObject)exprObj;
                    if (!expr.containsKey((Object)"ref") || !expr.get((Object)"ref").equals("`E_X_P_R_H_A_S_H_F_I_E_L_D`")) continue;
                    String hashField = (String)expr.get((Object)"expr");
                    Assert.assertNotNull((String)"HashExpr field can not be null", (Object)hashField);
                    Assert.assertTrue((String)"HashExpr field does not match pattern", (boolean)hashField.matches(hashExprPattern));
                    k = i;
                    foundExpr = true;
                    ++muxesCount;
                    break;
                }
                if (foundExpr) {
                    prevExprsArraySize = exprsArray.size();
                }
            }
            if (!foundExpr) continue;
            if (k == i - 1) {
                Assert.assertTrue((String)"UnorderedMux should follow Project with HashExpr", (popObj.containsKey((Object)"pop") && popObj.get((Object)"pop").equals(MUX_EXCHANGE_CONST) ? 1 : 0) != 0);
            }
            if (k == i - 2) {
                Assert.assertTrue((String)"HashToRandomExchange should follow UnorderedMux which should follow Project with HashExpr", (popObj.containsKey((Object)"pop") && popObj.get((Object)"pop").equals(HASH_EXCHANGE) ? 1 : 0) != 0);
                Assert.assertTrue((String)"HashToRandomExchnage should use hashExpr", (popObj.containsKey((Object)"expr") && popObj.get((Object)"expr").equals("`E_X_P_R_H_A_S_H_F_I_E_L_D`") ? 1 : 0) != 0);
            }
            if (isDemuxEnabled && k == i - 3) {
                Assert.assertTrue((String)"UnorderdDemuxExchange should follow HashToRandomExchange", (popObj.containsKey((Object)"pop") && popObj.get((Object)"pop").equals(DEMUX_EXCHANGE_CONST) ? 1 : 0) != 0);
                Assert.assertTrue((String)"UnorderdDemuxExchange should use hashExpr", (popObj.containsKey((Object)"expr") && popObj.get((Object)"expr").equals("`E_X_P_R_H_A_S_H_F_I_E_L_D`") ? 1 : 0) != 0);
            }
            if (isDemuxEnabled && k == i - 4 || !isDemuxEnabled && k == i - 3) {
                Assert.assertTrue((String)"Should be project without hashexpr", (popObj.containsKey((Object)"pop") && popObj.get((Object)"pop").equals("project") ? 1 : 0) != 0);
                exprsArray = (JSONArray)popObj.get((Object)"exprs");
                Assert.assertNotNull((String)"Project should have some fields", (Object)exprsArray);
                Assert.assertEquals((String)"Number of fields in closing project should be one less then in starting project", (long)prevExprsArraySize, (long)exprsArray.size());
                k = 0;
                foundExpr = false;
                prevExprsArraySize = 0;
            }
            ++i;
        }
        Assert.assertEquals((String)"Number of Project/Mux/HashExchange/... ", (long)expectedNumMuxes, (long)muxesCount);
    }

    private static void testHelperVerifyPartitionSenderParallelization(String plan, boolean isMuxOn, boolean isDeMuxOn) throws Exception {
        DrillbitContext drillbitContext = TestLocalExchange.getDrillbitContext();
        PhysicalPlanReader planReader = drillbitContext.getPlanReader();
        Fragment rootFragment = PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);
        LinkedList deMuxFragments = Lists.newLinkedList();
        LinkedList htrFragments = Lists.newLinkedList();
        PlanningSet planningSet = new PlanningSet();
        PARALLELIZER.initFragmentWrappers(rootFragment, planningSet);
        TestLocalExchange.findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments);
        BitControl.QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo((String)"dummySchemaName");
        QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(), UserBitShared.QueryId.getDefaultInstance(), drillbitContext.getBits(), planReader, rootFragment, USER_SESSION, queryContextInfo);
        ArrayListMultimap partitionSenderMap = ArrayListMultimap.create();
        for (BitControl.PlanFragment planFragment : qwu.getFragments()) {
            if (!planFragment.getFragmentJson().contains("hash-partition-sender")) continue;
            int majorFragmentId = planFragment.getHandle().getMajorFragmentId();
            CoordinationProtos.DrillbitEndpoint assignedEndpoint = planFragment.getAssignment();
            partitionSenderMap.get((Object)majorFragmentId).add(assignedEndpoint);
        }
        if (isMuxOn) {
            TestLocalExchange.verifyAssignment(htrFragments, (ArrayListMultimap<Integer, CoordinationProtos.DrillbitEndpoint>)partitionSenderMap);
        }
        if (isDeMuxOn) {
            TestLocalExchange.verifyAssignment(deMuxFragments, (ArrayListMultimap<Integer, CoordinationProtos.DrillbitEndpoint>)partitionSenderMap);
        }
    }

    private static void findFragmentsWithPartitionSender(Fragment currentRootFragment, PlanningSet planningSet, List<Integer> deMuxFragments, List<Integer> htrFragments) {
        if (currentRootFragment != null) {
            Exchange sendingExchange = currentRootFragment.getSendingExchange();
            if (sendingExchange != null) {
                int majorFragmentId = planningSet.get(currentRootFragment).getMajorFragmentId();
                if (sendingExchange instanceof UnorderedDeMuxExchange) {
                    deMuxFragments.add(majorFragmentId);
                } else if (sendingExchange instanceof HashToRandomExchange) {
                    htrFragments.add(majorFragmentId);
                }
            }
            for (Fragment.ExchangeFragmentPair e : currentRootFragment.getReceivingExchangePairs()) {
                TestLocalExchange.findFragmentsWithPartitionSender(e.getNode(), planningSet, deMuxFragments, htrFragments);
            }
        }
    }

    private static void verifyAssignment(List<Integer> fragmentList, ArrayListMultimap<Integer, CoordinationProtos.DrillbitEndpoint> partitionSenderMap) {
        Assert.assertTrue((fragmentList.size() > 0 ? 1 : 0) != 0);
        for (Integer majorFragmentId : fragmentList) {
            List assignments = partitionSenderMap.get((Object)majorFragmentId);
            Assert.assertNotNull((Object)assignments);
            Assert.assertTrue((assignments.size() > 0 ? 1 : 0) != 0);
            Assert.assertTrue((String)String.format("Number of partition senders in major fragment [%d] is more than expected", majorFragmentId), (3 >= assignments.size() ? 1 : 0) != 0);
            Assert.assertTrue((String)"Some endpoints have more than one fragment that has ParitionSender", (ImmutableSet.copyOf((Collection)assignments).size() == assignments.size() ? 1 : 0) != 0);
        }
    }

    @AfterClass
    public static void cleanupTempFolder() throws IOException {
        testTempFolder.delete();
    }
}

