package org.apache.drill.exec.physical.impl;

import java.io.File;
import java.io.PrintWriter;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.physical.config.UnorderedDeMuxExchange;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.TestBuilder;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/TestLocalExchange.class */
public class TestLocalExchange extends PlanTestBase {
    private static final int CLUSTER_SIZE = 3;
    private static final String MUX_EXCHANGE = "\"unordered-mux-exchange\"";
    private static final String DEMUX_EXCHANGE = "\"unordered-demux-exchange\"";
    private static final String MUX_EXCHANGE_CONST = "unordered-mux-exchange";
    private static final String DEMUX_EXCHANGE_CONST = "unordered-demux-exchange";
    private static final String HASH_EXCHANGE = "hash-to-random-exchange";
    private static final String EMPT_TABLE = "empTable";
    private static final String DEPT_TABLE = "deptTable";
    private static final UserSession USER_SESSION = UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build();
    private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(1, 6, 1000, 1.2d);
    private static final int NUM_DEPTS = 40;
    private static final int NUM_EMPLOYEES = 1000;
    private static final int NUM_MNGRS = 1;
    private static final int NUM_IDS = 1;
    private static String groupByQuery;
    private static String joinQuery;
    private static String[] joinQueryBaselineColumns;
    private static String[] groupByQueryBaselineColumns;
    private static List<Object[]> groupByQueryBaselineValues;
    private static List<Object[]> joinQueryBaselineValues;

    @BeforeClass
    public static void setupClusterSize() {
        updateTestCluster(CLUSTER_SIZE, null);
    }

    @BeforeClass
    public static void generateTestDataAndQueries() throws Exception {
        File makeRootSubDir = dirTestWatcher.makeRootSubDir(Paths.get(EMPT_TABLE, new String[0]));
        for (int i = 0; i < 10; i++) {
            PrintWriter printWriter = new PrintWriter(new File(makeRootSubDir, i + ".json"));
            for (int i2 = i * 100; i2 < (i + 1) * 100; i2++) {
                printWriter.println(String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d, \"mng_id\" : %d, \"some_id\" : %d }", Integer.valueOf(i2), Integer.valueOf(i2), Integer.valueOf(i2 % NUM_DEPTS), Integer.valueOf(i2 % 1), Integer.valueOf(i2 % 1)));
            }
            printWriter.close();
        }
        File makeRootSubDir2 = dirTestWatcher.makeRootSubDir(Paths.get(DEPT_TABLE, new String[0]));
        for (int i3 = 0; i3 < 10; i3++) {
            PrintWriter printWriter2 = new PrintWriter(new File(makeRootSubDir2, i3 + ".json"));
            for (int i4 = i3 * 4; i4 < (i3 + 1) * 4; i4++) {
                printWriter2.println(String.format("{ \"dept_id\" : %d, \"dept_name\" : \"Department %d\" }", Integer.valueOf(i4), Integer.valueOf(i4)));
            }
            printWriter2.close();
        }
        groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees FROM dfs.`%s` GROUP BY dept_id", EMPT_TABLE);
        joinQuery = String.format("SELECT e.emp_name, d.dept_name FROM dfs.`%s` e JOIN dfs.`%s` d ON e.dept_id = d.dept_id", EMPT_TABLE, DEPT_TABLE);
        groupByQueryBaselineColumns = new String[]{"dept_id", "numEmployees"};
        groupByQueryBaselineValues = Lists.newArrayList();
        for (int i5 = 0; i5 < NUM_DEPTS; i5++) {
            groupByQueryBaselineValues.add(new Object[]{Long.valueOf(i5), 25L});
        }
        joinQueryBaselineColumns = new String[]{"emp_name", "dept_name"};
        joinQueryBaselineValues = Lists.newArrayList();
        for (int i6 = 0; i6 < 1000; i6++) {
            joinQueryBaselineValues.add(new String[]{String.format("Employee %d", Integer.valueOf(i6)), String.format("Department %d", Integer.valueOf(i6 % NUM_DEPTS))});
        }
    }

    public static void setupHelper(boolean z, boolean z2) throws Exception {
        test("ALTER SESSION SET `planner.slice_target`=1");
        test("ALTER SESSION SET `planner.enable_broadcast_join`=false");
        test("ALTER SESSION SET `planner.enable_mux_exchange`=" + z);
        test("ALTER SESSION SET `planner.enable_demux_exchange`=" + z2);
    }

    @Test
    public void testGroupByMultiFields() throws Exception {
        test("ALTER SESSION SET `planner.slice_target`=1");
        test("ALTER SESSION SET `planner.enable_mux_exchange`=true");
        test("ALTER SESSION SET `planner.enable_demux_exchange`=false");
        String format = String.format("SELECT dept_id, mng_id, some_id, count(*) as numEmployees FROM dfs.`%s` e GROUP BY dept_id, mng_id, some_id", EMPT_TABLE);
        jsonExchangeOrderChecker(getPlanInString("EXPLAIN PLAN FOR " + format, ClusterFixture.EXPLAIN_PLAN_JSON), false, 1, "hash32asdouble\\(.*, hash32asdouble\\(.*, hash32asdouble\\(.*\\) \\) \\) ");
        TestBuilder baselineColumns = testBuilder().sqlQuery(format).unOrdered().baselineColumns("dept_id", "mng_id", "some_id", "numEmployees");
        for (int i = 0; i < NUM_DEPTS; i++) {
            baselineColumns.baselineValues(Long.valueOf(i), 0L, 0L, 25L);
        }
        baselineColumns.go();
    }

    @Test
    public void testGroupBy_NoMux_NoDeMux() throws Exception {
        testGroupByHelper(false, false);
    }

    @Test
    public void testJoin_NoMux_NoDeMux() throws Exception {
        testJoinHelper(false, false);
    }

    @Test
    public void testGroupBy_Mux_NoDeMux() throws Exception {
        testGroupByHelper(true, false);
    }

    @Test
    public void testJoin_Mux_NoDeMux() throws Exception {
        testJoinHelper(true, false);
    }

    @Test
    public void testGroupBy_NoMux_DeMux() throws Exception {
        testGroupByHelper(false, true);
    }

    @Test
    public void testJoin_NoMux_DeMux() throws Exception {
        testJoinHelper(false, true);
    }

    @Test
    public void testGroupBy_Mux_DeMux() throws Exception {
        testGroupByHelper(true, true);
    }

    @Test
    public void testJoin_Mux_DeMux() throws Exception {
        testJoinHelper(true, true);
    }

    private static void testGroupByHelper(boolean z, boolean z2) throws Exception {
        testHelper(z, z2, groupByQuery, z ? 1 : 0, z2 ? 1 : 0, groupByQueryBaselineColumns, groupByQueryBaselineValues);
    }

    public static void testJoinHelper(boolean z, boolean z2) throws Exception {
        testHelper(z, z2, joinQuery, z ? 2 : 0, z2 ? 2 : 0, joinQueryBaselineColumns, joinQueryBaselineValues);
    }

    private static void testHelper(boolean z, boolean z2, String str, int i, int i2, String[] strArr, List<Object[]> list) throws Exception {
        setupHelper(z, z2);
        String planInString = getPlanInString("EXPLAIN PLAN FOR " + str, ClusterFixture.EXPLAIN_PLAN_JSON);
        if (z) {
            Assert.assertEquals("HashExpr on the hash column should not happen", (2 * i) + i2, StringUtils.countMatches(planInString, "E_X_P_R_H_A_S_H_F_I_E_L_D"));
            jsonExchangeOrderChecker(planInString, z2, i, "hash32asdouble\\(.*\\) ");
        } else {
            Assert.assertEquals("HashExpr on the hash column should not happen", 0L, StringUtils.countMatches(planInString, "E_X_P_R_H_A_S_H_F_I_E_L_D"));
        }
        Assert.assertEquals("Wrong number of MuxExchanges are present in the plan", i, StringUtils.countMatches(planInString, MUX_EXCHANGE));
        Assert.assertEquals("Wrong number of DeMuxExchanges are present in the plan", i2, StringUtils.countMatches(planInString, DEMUX_EXCHANGE));
        TestBuilder baselineColumns = testBuilder().sqlQuery(str).unOrdered().baselineColumns(strArr);
        Iterator<Object[]> it = list.iterator();
        while (it.hasNext()) {
            baselineColumns.baselineValues(it.next());
        }
        baselineColumns.go();
        testHelperVerifyPartitionSenderParallelization(planInString, z, z2);
    }

    private static void jsonExchangeOrderChecker(String str, boolean z, int i, String str2) throws Exception {
        JSONObject jSONObject = (JSONObject) new JSONParser().parse(str);
        Assert.assertNotNull("Corrupted query plan: null", jSONObject);
        JSONArray jSONArray = (JSONArray) jSONObject.get("graph");
        Assert.assertNotNull("No graph array present", jSONArray);
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        boolean z2 = false;
        int i5 = 0;
        Iterator it = jSONArray.iterator();
        while (it.hasNext()) {
            JSONObject jSONObject2 = (JSONObject) it.next();
            if (jSONObject2.containsKey("pop") && jSONObject2.get("pop").equals("project") && jSONObject2.containsKey("exprs")) {
                JSONArray jSONArray2 = (JSONArray) jSONObject2.get("exprs");
                Iterator it2 = jSONArray2.iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    JSONObject jSONObject3 = (JSONObject) it2.next();
                    if (jSONObject3.containsKey("ref") && jSONObject3.get("ref").equals("`E_X_P_R_H_A_S_H_F_I_E_L_D`")) {
                        String str3 = (String) jSONObject3.get("expr");
                        Assert.assertNotNull("HashExpr field can not be null", str3);
                        Assert.assertTrue("HashExpr field does not match pattern", str3.matches(str2));
                        i3 = i2;
                        z2 = true;
                        i5++;
                        break;
                    }
                }
                if (z2) {
                    i4 = jSONArray2.size();
                }
            }
            if (z2) {
                if (i3 == i2 - 1) {
                    Assert.assertTrue("UnorderedMux should follow Project with HashExpr", jSONObject2.containsKey("pop") && jSONObject2.get("pop").equals(MUX_EXCHANGE_CONST));
                }
                if (i3 == i2 - 2) {
                    Assert.assertTrue("HashToRandomExchange should follow UnorderedMux which should follow Project with HashExpr", jSONObject2.containsKey("pop") && jSONObject2.get("pop").equals(HASH_EXCHANGE));
                    Assert.assertTrue("HashToRandomExchnage should use hashExpr", jSONObject2.containsKey("expr") && jSONObject2.get("expr").equals("`E_X_P_R_H_A_S_H_F_I_E_L_D`"));
                }
                if (z && i3 == i2 - CLUSTER_SIZE) {
                    Assert.assertTrue("UnorderdDemuxExchange should follow HashToRandomExchange", jSONObject2.containsKey("pop") && jSONObject2.get("pop").equals(DEMUX_EXCHANGE_CONST));
                    Assert.assertTrue("UnorderdDemuxExchange should use hashExpr", jSONObject2.containsKey("expr") && jSONObject2.get("expr").equals("`E_X_P_R_H_A_S_H_F_I_E_L_D`"));
                }
                if ((z && i3 == i2 - 4) || (!z && i3 == i2 - CLUSTER_SIZE)) {
                    Assert.assertTrue("Should be project without hashexpr", jSONObject2.containsKey("pop") && jSONObject2.get("pop").equals("project"));
                    Assert.assertNotNull("Project should have some fields", (JSONArray) jSONObject2.get("exprs"));
                    Assert.assertEquals("Number of fields in closing project should be one less then in starting project", i4, r0.size());
                    i3 = 0;
                    z2 = false;
                    i4 = 0;
                }
                i2++;
            }
        }
        Assert.assertEquals("Number of Project/Mux/HashExchange/... ", i, i5);
    }

    private static void testHelperVerifyPartitionSenderParallelization(String str, boolean z, boolean z2) throws Exception {
        DrillbitContext drillbitContext = getDrillbitContext();
        PhysicalPlanReader planReader = drillbitContext.getPlanReader();
        Fragment rootFragmentFromPlanString = PopUnitTestBase.getRootFragmentFromPlanString(planReader, str);
        LinkedList newLinkedList = Lists.newLinkedList();
        LinkedList newLinkedList2 = Lists.newLinkedList();
        PlanningSet planningSet = new PlanningSet();
        PARALLELIZER.initFragmentWrappers(rootFragmentFromPlanString, planningSet);
        findFragmentsWithPartitionSender(rootFragmentFromPlanString, planningSet, newLinkedList, newLinkedList2);
        QueryWorkUnit fragments = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(), UserBitShared.QueryId.getDefaultInstance(), drillbitContext.getBits(), rootFragmentFromPlanString, USER_SESSION, Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"));
        fragments.applyPlan(planReader);
        ArrayListMultimap create = ArrayListMultimap.create();
        for (BitControl.PlanFragment planFragment : fragments.getFragments()) {
            if (planFragment.getFragmentJson().contains("hash-partition-sender")) {
                int majorFragmentId = planFragment.getHandle().getMajorFragmentId();
                create.get(Integer.valueOf(majorFragmentId)).add(planFragment.getAssignment());
            }
        }
        if (z) {
            verifyAssignment(newLinkedList2, create);
        }
        if (z2) {
            verifyAssignment(newLinkedList, create);
        }
    }

    private static void findFragmentsWithPartitionSender(Fragment fragment, PlanningSet planningSet, List<Integer> list, List<Integer> list2) {
        if (fragment != null) {
            Exchange sendingExchange = fragment.getSendingExchange();
            if (sendingExchange != null) {
                int majorFragmentId = planningSet.get(fragment).getMajorFragmentId();
                if (sendingExchange instanceof UnorderedDeMuxExchange) {
                    list.add(Integer.valueOf(majorFragmentId));
                } else if (sendingExchange instanceof HashToRandomExchange) {
                    list2.add(Integer.valueOf(majorFragmentId));
                }
            }
            Iterator it = fragment.getReceivingExchangePairs().iterator();
            while (it.hasNext()) {
                findFragmentsWithPartitionSender(((Fragment.ExchangeFragmentPair) it.next()).getNode(), planningSet, list, list2);
            }
        }
    }

    private static void verifyAssignment(List<Integer> list, ArrayListMultimap<Integer, CoordinationProtos.DrillbitEndpoint> arrayListMultimap) {
        Assert.assertTrue(list.size() > 0);
        for (Integer num : list) {
            List list2 = arrayListMultimap.get(num);
            Assert.assertNotNull(list2);
            Assert.assertTrue(list2.size() > 0);
            Assert.assertTrue(String.format("Number of partition senders in major fragment [%d] is more than expected", num), CLUSTER_SIZE >= list2.size());
            Assert.assertTrue("Some endpoints have more than one fragment that has ParitionSender", ImmutableSet.copyOf(list2).size() == list2.size());
        }
    }
}
