package org.apache.drill.exec.physical.impl.join;

import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectnessBatchProcessing.class */
public class TestLateralJoinCorrectnessBatchProcessing extends SubOperatorTest {
    private static OperatorContext operatorContext;
    private static TupleMetadata leftSchema;
    private static TupleMetadata rightSchema;
    private static TupleMetadata expectedSchema;
    private static TupleMetadata expectedSchemaLeftJoin;
    private static RowSet.SingleRowSet emptyLeftRowSet;
    private static RowSet.SingleRowSet nonEmptyLeftRowSet;
    private static RowSet.SingleRowSet emptyRightRowSet;
    private static RowSet.SingleRowSet nonEmptyRightRowSet;
    private static LateralJoinPOP ljPopConfig;
    private static final List<VectorContainer> leftContainer = new ArrayList(5);
    private static final List<RecordBatch.IterOutcome> leftOutcomes = new ArrayList(5);
    private static final List<VectorContainer> rightContainer = new ArrayList(5);
    private static final List<RecordBatch.IterOutcome> rightOutcomes = new ArrayList(5);

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        operatorContext = fixture.newOperatorContext(new MockStorePOP((PhysicalOperator) null));
        ljPopConfig = new LateralJoinPOP((PhysicalOperator) null, (PhysicalOperator) null, JoinRelType.INNER, "$drill_implicit_field$", Lists.newArrayList());
        leftSchema = new SchemaBuilder().add("id_left", TypeProtos.MinorType.INT).add("cost_left", TypeProtos.MinorType.INT).add("name_left", TypeProtos.MinorType.VARCHAR).buildSchema();
        emptyLeftRowSet = fixture.rowSetBuilder(leftSchema).build();
        rightSchema = new SchemaBuilder().add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT).add("id_right", TypeProtos.MinorType.INT).add("cost_right", TypeProtos.MinorType.INT).add("name_right", TypeProtos.MinorType.VARCHAR).buildSchema();
        emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
        expectedSchema = new SchemaBuilder().add("id_left", TypeProtos.MinorType.INT).add("cost_left", TypeProtos.MinorType.INT).add("name_left", TypeProtos.MinorType.VARCHAR).add("id_right", TypeProtos.MinorType.INT).add("cost_right", TypeProtos.MinorType.INT).add("name_right", TypeProtos.MinorType.VARCHAR).buildSchema();
        expectedSchemaLeftJoin = new SchemaBuilder().add("id_left", TypeProtos.MinorType.INT).add("cost_left", TypeProtos.MinorType.INT).add("name_left", TypeProtos.MinorType.VARCHAR).add("id_right", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL).add("cost_right", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL).add("name_right", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL).buildSchema();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        operatorContext.close();
        emptyLeftRowSet.clear();
        emptyRightRowSet.clear();
    }

    @Before
    public void beforeTest() throws Exception {
        nonEmptyLeftRowSet = fixture.rowSetBuilder(leftSchema).addRow(new Object[]{1, 10, "item1"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{4, 40, "item4"}).build();
        nonEmptyRightRowSet = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{1, 11, 110, "item11"}).addRow(new Object[]{2, 22, 220, "item22"}).addRow(new Object[]{3, 33, 330, "item33"}).addRow(new Object[]{4, 44, 440, "item44"}).build();
    }

    @After
    public void afterTest() throws Exception {
        nonEmptyLeftRowSet.clear();
        leftContainer.clear();
        leftOutcomes.clear();
        nonEmptyRightRowSet.clear();
        rightContainer.clear();
        rightOutcomes.clear();
    }

    @Test
    public void testLeftAndRightAllMatchingRows_SingleBatch() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(nonEmptyRightRowSet.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
            new RowSetComparison(build).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRightAllMatchingRows_MultipleBatch() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{4, 44, 440, "item44"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(nonEmptyRightRowSet.container());
        rightContainer.add(build.container());
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44"}).build();
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.OK);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount() + build.rowCount());
            new RowSetComparison(build2).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRightAllMatchingRows_SecondBatch_Empty() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(nonEmptyRightRowSet.container());
        rightContainer.add(emptyRightRowSet.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.OK);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
            new RowSetComparison(build).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRightWithMissingRows_SingleBatch() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{1, 11, 110, "item11"}).addRow(new Object[]{4, 44, 440, "item44"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == build.rowCount());
            new RowSetComparison(build2).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRightWithMissingRows_LeftJoin_SingleBatch() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{1, 11, 110, "item11"}).addRow(new Object[]{4, 44, 440, "item44"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(expectedSchemaLeftJoin).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", null, null, null}).addRow(new Object[]{3, 30, "item3", null, null, null}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(new LateralJoinPOP((PhysicalOperator) null, (PhysicalOperator) null, JoinRelType.LEFT, "$drill_implicit_field$", Lists.newArrayList()), fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
            new RowSetComparison(build2).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRightWithInitialMissingRows_MultipleBatch() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{2, 22, 220, "item22"}).addRow(new Object[]{3, 33, 330, "item33"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{4, 44, 440, "item44_1"}).addRow(new Object[]{4, 44, 440, "item44_2"}).build();
        RowSet.SingleRowSet build3 = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44_1"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44_2"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build.container());
        rightContainer.add(build2.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.OK);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == build.rowCount() + build2.rowCount());
            new RowSetComparison(build3).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build3.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build3.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRightWithInitialMissingRows_LeftJoin_MultipleBatch() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{2, 22, 220, "item22"}).addRow(new Object[]{3, 33, 330, "item33"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{4, 44, 440, "item44_1"}).addRow(new Object[]{4, 44, 440, "item44_2"}).build();
        RowSet.SingleRowSet build3 = fixture.rowSetBuilder(expectedSchemaLeftJoin).addRow(new Object[]{1, 10, "item1", null, null, null}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44_1"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44_2"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build.container());
        rightContainer.add(build2.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.OK);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(new LateralJoinPOP((PhysicalOperator) null, (PhysicalOperator) null, JoinRelType.LEFT, "$drill_implicit_field$", Lists.newArrayList()), fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == (1 + build.rowCount()) + build2.rowCount());
            new RowSetComparison(build3).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build3.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build3.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRightWithLastMissingRows_MultipleBatch() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{1, 11, 110, "item11"}).addRow(new Object[]{2, 22, 220, "item22"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{3, 33, 330, "item33_1"}).addRow(new Object[]{3, 33, 330, "item33_2"}).build();
        RowSet.SingleRowSet build3 = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33_1"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33_2"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build.container());
        rightContainer.add(build2.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.OK);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == build.rowCount() + build2.rowCount());
            new RowSetComparison(build3).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build3.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build3.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRightWithLastMissingRows_LeftJoin_MultipleBatch() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{1, 11, 110, "item11"}).addRow(new Object[]{2, 22, 220, "item22"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{3, 33, 330, "item33_1"}).addRow(new Object[]{3, 33, 330, "item33_2"}).build();
        RowSet.SingleRowSet build3 = fixture.rowSetBuilder(expectedSchemaLeftJoin).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33_1"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33_2"}).addRow(new Object[]{4, 40, "item4", null, null, null}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build.container());
        rightContainer.add(build2.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.OK);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(new LateralJoinPOP((PhysicalOperator) null, (PhysicalOperator) null, JoinRelType.LEFT, "$drill_implicit_field$", Lists.newArrayList()), fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == (1 + build.rowCount()) + build2.rowCount());
            new RowSetComparison(build3).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build3.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build3.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRight_OutputFull_InRightBatchMiddle() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{4, 44, 440, "item44_2_1"}).addRow(new Object[]{4, 44, 440, "item44_2_2"}).addRow(new Object[]{4, 44, 440, "item44_2_3"}).addRow(new Object[]{4, 44, 440, "item44_2_4"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44_2_1"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44_2_2"}).build();
        RowSet.SingleRowSet build3 = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{4, 40, "item4", 44, 440, "item44_2_3"}).addRow(new Object[]{4, 40, "item4", 44, 440, "item44_2_4"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(nonEmptyRightRowSet.container());
        rightContainer.add(build.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.OK);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        lateralJoinBatch.setMaxOutputRowCount(6);
        lateralJoinBatch.setUseMemoryManager(false);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == 6);
            new RowSetComparison(build2).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            VectorAccessibleUtilities.clear(lateralJoinBatch);
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == (nonEmptyRightRowSet.rowCount() + build.rowCount()) - 6);
            new RowSetComparison(build3).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build2.clear();
            build3.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build2.clear();
            build3.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRight_OutputFull_WithPendingLeftRow() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{1, 11, 110, "item11"}).addRow(new Object[]{2, 22, 220, "item22"}).addRow(new Object[]{3, 33, 330, "item33"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(expectedSchema).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", 33, 330, "item33"}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        lateralJoinBatch.setMaxOutputRowCount(3);
        lateralJoinBatch.setUseMemoryManager(false);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == build.rowCount());
            new RowSetComparison(build2).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
            throw th;
        }
    }

    @Test
    public void testLeftAndRight_OutputFull_WithPendingLeftRow_LeftJoin() throws Exception {
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{1, 11, 110, "item11"}).addRow(new Object[]{2, 22, 220, "item22"}).build();
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(expectedSchemaLeftJoin).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", null, null, null}).build();
        RowSet.SingleRowSet build3 = fixture.rowSetBuilder(expectedSchemaLeftJoin).addRow(new Object[]{4, 40, "item4", null, null, null}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(new LateralJoinPOP((PhysicalOperator) null, (PhysicalOperator) null, JoinRelType.LEFT, "$drill_implicit_field$", Lists.newArrayList()), fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        lateralJoinBatch.setMaxOutputRowCount(3);
        lateralJoinBatch.setUseMemoryManager(false);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == 3);
            new RowSetComparison(build2).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            VectorAccessibleUtilities.clear(lateralJoinBatch);
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount() - 3);
            new RowSetComparison(build3).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
            build3.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
            build3.clear();
            throw th;
        }
    }

    @Test
    public void testMultipleLeftAndRight_OutputFull_WithPendingLeftRow_LeftJoin() throws Exception {
        RowSet.SingleRowSet build = fixture.rowSetBuilder(leftSchema).addRow(new Object[]{5, 50, "item5"}).addRow(new Object[]{6, 60, "item6"}).build();
        leftContainer.add(nonEmptyLeftRowSet.container());
        leftContainer.add(build.container());
        leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        leftOutcomes.add(RecordBatch.IterOutcome.OK);
        MockRecordBatch mockRecordBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
        RowSet.SingleRowSet build2 = fixture.rowSetBuilder(rightSchema).addRow(new Object[]{1, 11, 110, "item11"}).addRow(new Object[]{2, 22, 220, "item22"}).build();
        RowSet.SingleRowSet build3 = fixture.rowSetBuilder(expectedSchemaLeftJoin).addRow(new Object[]{1, 10, "item1", 11, 110, "item11"}).addRow(new Object[]{2, 20, "item2", 22, 220, "item22"}).addRow(new Object[]{3, 30, "item3", null, null, null}).build();
        RowSet.SingleRowSet build4 = fixture.rowSetBuilder(expectedSchemaLeftJoin).addRow(new Object[]{4, 40, "item4", null, null, null}).addRow(new Object[]{5, 50, "item5", null, null, null}).addRow(new Object[]{6, 60, "item6", null, null, null}).build();
        rightContainer.add(emptyRightRowSet.container());
        rightContainer.add(build2.container());
        rightContainer.add(emptyRightRowSet.container());
        rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
        MockRecordBatch mockRecordBatch2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
        LateralJoinBatch lateralJoinBatch = new LateralJoinBatch(new LateralJoinPOP((PhysicalOperator) null, (PhysicalOperator) null, JoinRelType.LEFT, "$drill_implicit_field$", Lists.newArrayList()), fixture.getFragmentContext(), mockRecordBatch, mockRecordBatch2);
        lateralJoinBatch.setMaxOutputRowCount(3);
        lateralJoinBatch.setUseMemoryManager(false);
        try {
            Assert.assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == lateralJoinBatch.next());
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == 3);
            new RowSetComparison(build3).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            VectorAccessibleUtilities.clear(lateralJoinBatch);
            Assert.assertTrue(RecordBatch.IterOutcome.OK == lateralJoinBatch.next());
            Assert.assertTrue(lateralJoinBatch.getRecordCount() == (nonEmptyLeftRowSet.rowCount() + build.rowCount()) - 3);
            new RowSetComparison(build4).verify(DirectRowSet.fromContainer(lateralJoinBatch.getContainer()));
            Assert.assertTrue(RecordBatch.IterOutcome.NONE == lateralJoinBatch.next());
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
            build3.clear();
            build4.clear();
        } catch (Throwable th) {
            lateralJoinBatch.close();
            mockRecordBatch.close();
            mockRecordBatch2.close();
            build.clear();
            build2.clear();
            build3.clear();
            build4.clear();
            throw th;
        }
    }
}
