package org.apache.drill.exec.physical.impl.agg;

import java.util.ArrayList;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.drill.test.rowSet.test.PerformanceTool;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({OperatorTest.class})
/* loaded from: input_file:org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.class */
public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    protected static TupleMetadata resultSchema;
    protected static TupleMetadata resultSchemaNoGroupBy;

    @BeforeClass
    public static void setUpBeforeClass2() throws Exception {
        resultSchema = new SchemaBuilder().add("name", TypeProtos.MinorType.VARCHAR).addNullable("total_sum", TypeProtos.MinorType.BIGINT).buildSchema();
        resultSchemaNoGroupBy = new SchemaBuilder().addNullable("total_sum", TypeProtos.MinorType.BIGINT).buildSchema();
    }

    @Test
    public void t1_testStreamingAggrEmptyBatchEmitOutcome() {
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
    }

    @Test
    public void t2_testStreamingAggrNonEmptyBatchEmitOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{4, 40, "item4"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item1", 11L}).addRow(new Object[]{"item13", 286L}).addRow(new Object[]{"item2", 44L}).addRow(new Object[]{"item4", 44L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(4L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
    }

    @Test
    public void t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{0, 1300, "item13"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{0, 2000, "item2"}).addRow(new Object[]{4, 40, "item4"}).addRow(new Object[]{0, 4000, "item4"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item13", 1443L}).addRow(new Object[]{"item2", 2022L}).addRow(new Object[]{"item4", 4044L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(3L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
    }

    @Test
    public void t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{0, 0, "item13"}).addRow(new Object[]{1, 33000, "item13"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{0, 0, "item2"}).addRow(new Object[]{1, 11000, "item2"}).addRow(new Object[]{4, 40, "item4"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item13", 33144L}).addRow(new Object[]{"item2", 11023L}).addRow(new Object[]{"item4", 44L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(3L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
    }

    @Test
    public void t5_testStreamingAgrResetsAfterFirstEmitOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item1", 11L}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item2", 44L}).addRow(new Object[]{"item3", 330L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(2L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build3).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build3.clear();
        build2.clear();
    }

    @Test
    public void t6_testStreamingAggrOkFollowedByNone() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{4, 40, "item4"}).addRow(new Object[]{4, 40, "item4"}).addRow(new Object[]{5, 50, "item5"}).addRow(new Object[]{5, 50, "item5"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item1", 11L}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item2", 22L}).addRow(new Object[]{"item3", 33L}).addRow(new Object[]{"item4", 88L}).addRow(new Object[]{"item5", 110L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK);
        Assert.assertEquals(4L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build3).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build3.clear();
        build2.clear();
    }

    @Test
    public void t7_testStreamingAggrMultipleEMITOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{3, 30, "item3"}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(2L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
    }

    @Test
    public void t8_testStreamingAggrMultipleInputToSingleOutputBatch() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item1", 11L}).addRow(new Object[]{"item2", 22L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(2L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
    }

    @Test
    public void t8_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{1, 20, "item1"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 30, "item2"}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 40, "item2"}).addRow(new Object[]{2, 50, "item2"}).addRow(new Object[]{2, 60, "item2"}).addRow(new Object[]{2, 70, "item2"}).addRow(new Object[]{3, 100, "item3"}).addRow(new Object[]{3, 200, "item3"}).addRow(new Object[]{3, Integer.valueOf(PerformanceTool.ITERATIONS), "item3"}).addRow(new Object[]{3, 400, "item3"}).build();
        TupleMetadata buildSchema = new SchemaBuilder().add("name", TypeProtos.MinorType.VARCHAR).add("id", TypeProtos.MinorType.INT).add("total_count", TypeProtos.MinorType.BIGINT).buildSchema();
        RowSet.SingleRowSet build4 = this.operatorFixture.rowSetBuilder(buildSchema).addRow(new Object[]{"item1", 1, 2L}).addRow(new Object[]{"item2", 2, 5L}).build();
        RowSet.SingleRowSet build5 = this.operatorFixture.rowSetBuilder(buildSchema).addRow(new Object[]{"item3", 3, 4L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(build2.container());
        this.inputContainer.add(build3.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name", "id_left", "id"), parseExprs("count(cost_left)", "total_count")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        streamingAggBatch.setMaxOutputRowCount(2);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(2L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build4).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build5).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
        build3.clear();
        build4.clear();
        build5.clear();
    }

    @Test
    public void t8_2_testStreamingAggr_Inputs_OK_EMIT_SplitToMultipleOutputBatch() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{1, 20, "item1"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 30, "item2"}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 40, "item2"}).addRow(new Object[]{2, 50, "item2"}).addRow(new Object[]{2, 60, "item2"}).addRow(new Object[]{2, 70, "item2"}).addRow(new Object[]{3, 100, "item3"}).addRow(new Object[]{3, 200, "item3"}).addRow(new Object[]{3, Integer.valueOf(PerformanceTool.ITERATIONS), "item3"}).addRow(new Object[]{3, 400, "item3"}).build();
        RowSet.SingleRowSet build4 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 40, "item2"}).build();
        RowSet.SingleRowSet build5 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 50, "item2"}).build();
        RowSet.SingleRowSet build6 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{3, 130, "item3"}).addRow(new Object[]{3, 130, "item3"}).addRow(new Object[]{4, 140, "item4"}).addRow(new Object[]{4, 140, "item4"}).build();
        TupleMetadata buildSchema = new SchemaBuilder().add("name", TypeProtos.MinorType.VARCHAR).add("id", TypeProtos.MinorType.INT).add("total_count", TypeProtos.MinorType.BIGINT).buildSchema();
        RowSet.SingleRowSet build7 = this.operatorFixture.rowSetBuilder(buildSchema).addRow(new Object[]{"item1", 1, 2L}).addRow(new Object[]{"item2", 2, 5L}).build();
        RowSet.SingleRowSet build8 = this.operatorFixture.rowSetBuilder(buildSchema).addRow(new Object[]{"item3", 3, 4L}).build();
        RowSet.SingleRowSet build9 = this.operatorFixture.rowSetBuilder(buildSchema).addRow(new Object[]{"item2", 2, 2L}).addRow(new Object[]{"item3", 3, 2L}).build();
        RowSet.SingleRowSet build10 = this.operatorFixture.rowSetBuilder(buildSchema).addRow(new Object[]{"item4", 4, 2L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(build2.container());
        this.inputContainer.add(build3.container());
        this.inputContainer.add(build4.container());
        this.inputContainer.add(build5.container());
        this.inputContainer.add(build6.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name", "id_left", "id"), parseExprs("count(cost_left)", "total_count")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        streamingAggBatch.setMaxOutputRowCount(2);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(2L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build7).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build8).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK);
        Assert.assertEquals(2L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build9).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build10).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
        build3.clear();
        build4.clear();
        build5.clear();
        build6.clear();
        build7.clear();
        build8.clear();
        build9.clear();
        build10.clear();
    }

    @Test
    public void t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item1"}).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{130, 1300, "item130"}).addRow(new Object[]{0, 0, "item130"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{23, 230, "item23"}).addRow(new Object[]{3, 33, "item3"}).addRow(new Object[]{7, 70, "item7"}).addRow(new Object[]{17, 170, "item7"}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(resultSchema).addRow(new Object[]{"item1", 33L}).addRow(new Object[]{"item13", 429L}).addRow(new Object[]{"item130", 1430L}).addRow(new Object[]{"item23", 253L}).addRow(new Object[]{"item3", 36L}).addRow(new Object[]{"item7", 264L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build2.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(6L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build3).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
        build3.clear();
    }

    @Test
    public void t10_testStreamingAggrWithEmptyDataSet() {
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name"), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
    }

    @Test
    public void t10_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{1, 20, "item1"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 30, "item2"}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 40, "item2"}).addRow(new Object[]{2, 50, "item2"}).addRow(new Object[]{2, 60, "item2"}).addRow(new Object[]{2, 70, "item2"}).addRow(new Object[]{3, 100, "item3"}).addRow(new Object[]{3, 200, "item3"}).addRow(new Object[]{3, Integer.valueOf(PerformanceTool.ITERATIONS), "item3"}).addRow(new Object[]{3, 400, "item3"}).build();
        TupleMetadata buildSchema = new SchemaBuilder().add("name", TypeProtos.MinorType.VARCHAR).add("id", TypeProtos.MinorType.INT).add("total_count", TypeProtos.MinorType.BIGINT).buildSchema();
        RowSet.SingleRowSet build4 = this.operatorFixture.rowSetBuilder(buildSchema).addRow(new Object[]{"item1", 1, 2L}).addRow(new Object[]{"item2", 2, 5L}).build();
        RowSet.SingleRowSet build5 = this.operatorFixture.rowSetBuilder(buildSchema).addRow(new Object[]{"item3", 3, 4L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(build2.container());
        this.inputContainer.add(build3.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, parseExprs("name_left", "name", "id_left", "id"), parseExprs("count(cost_left)", "total_count")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        streamingAggBatch.setMaxOutputRowCount(2);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(2L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build4).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build5).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
        build3.clear();
        build4.clear();
        build5.clear();
    }

    @Test
    public void t11_testStreamingAggrEmptyBatchEmitOutcome() {
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
    }

    @Test
    public void t12_testStreamingAggrNonEmptyBatchEmitOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{4, 40, "item4"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{385L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
    }

    @Test
    public void t13_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{0, 1300, "item13"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{0, 2000, "item2"}).addRow(new Object[]{4, 40, "item4"}).addRow(new Object[]{0, 4000, "item4"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{7509L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
    }

    @Test
    public void t14_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{0, 0, "item13"}).addRow(new Object[]{1, 33000, "item13"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{0, 0, "item2"}).addRow(new Object[]{1, 11000, "item2"}).addRow(new Object[]{4, 40, "item4"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{44211L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
    }

    @Test
    public void t15_testStreamingAgrResetsAfterFirstEmitOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{3, 30, "item3"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{11L}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{374L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build3).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build3.clear();
        build2.clear();
    }

    @Test
    public void t16_testStreamingAggrOkFollowedByNone() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{3, 30, "item3"}).addRow(new Object[]{4, 40, "item4"}).addRow(new Object[]{4, 40, "item4"}).addRow(new Object[]{5, 50, "item5"}).addRow(new Object[]{5, 50, "item5"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{11L}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{253L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build3).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build3.clear();
        build2.clear();
    }

    @Test
    public void t17_testStreamingAggrMultipleEMITOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).addRow(new Object[]{3, 30, "item3"}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
    }

    @Test
    public void t18_testStreamingAggrMultipleInputToSingleOutputBatch() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{33L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
    }

    @Test
    public void t19_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item1"}).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{13, 130, "item13"}).addRow(new Object[]{130, 1300, "item130"}).addRow(new Object[]{0, 0, "item130"}).build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{23, 230, "item23"}).addRow(new Object[]{3, 33, "item3"}).addRow(new Object[]{7, 70, "item7"}).addRow(new Object[]{17, 170, "item7"}).build();
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{2445L}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.nonEmptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build2.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build3).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
        build.clear();
        build2.clear();
        build3.clear();
    }

    @Test
    public void t20_testStreamingAggrWithEmptyDataSet() {
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.emptyInputRowSet.container().getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK);
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.NONE);
    }

    @Test
    public void t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2() {
        TupleMetadata buildSchema = new SchemaBuilder().add("id_left", TypeProtos.MinorType.INT).add("cost_left", TypeProtos.MinorType.INT).add("name_left", TypeProtos.MinorType.VARCHAR).buildSchema();
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(buildSchema).withSv2().build();
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(buildSchema).addSelection(false, new Object[]{2, 20, "item2"}).addSelection(true, new Object[]{3, 30, "item3"}).withSv2().build();
        this.inputContainer.add(build.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(build.container());
        this.inputContainer.add(build2.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputContainerSv2.add(build.getSv2());
        this.inputContainerSv2.add(build.getSv2());
        this.inputContainerSv2.add(build.getSv2());
        this.inputContainerSv2.add(build.getSv2());
        this.inputContainerSv2.add(build.getSv2());
        this.inputContainerSv2.add(build2.getSv2());
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), (OperatorContext) this.opContext, this.inputContainer, this.inputOutcomes, this.inputContainerSv2, this.inputContainer.get(0).getSchema()), this.operatorFixture.getFragmentContext());
        RowSet.SingleRowSet build3 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).addRow(new Object[]{33L}).build();
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build3).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        build2.clear();
        build.clear();
        build3.clear();
    }

    @Test
    public void t22_testStreamingAggrRunsOfEmpty_NonEmpty() {
        RowSet.SingleRowSet build = this.operatorFixture.rowSetBuilder(inputSchema).addRow(new Object[]{2, 20, "item2"}).build();
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(this.emptyInputRowSet.container());
        this.inputContainer.add(build.container());
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        this.inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
        RowSet.SingleRowSet build2 = this.operatorFixture.rowSetBuilder(resultSchemaNoGroupBy).build();
        StreamingAggBatch streamingAggBatch = new StreamingAggBatch(new StreamingAggregate((PhysicalOperator) null, new ArrayList(), parseExprs("sum(id_left+cost_left)", "total_sum")), new MockRecordBatch(this.operatorFixture.getFragmentContext(), this.opContext, this.inputContainer, this.inputOutcomes, this.inputContainer.get(0).getSchema()), this.operatorFixture.getFragmentContext());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(0L, streamingAggBatch.getRecordCount());
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        new RowSetComparison(build2).verify(DirectRowSet.fromContainer(streamingAggBatch.getContainer()));
        Assert.assertTrue(streamingAggBatch.next() == RecordBatch.IterOutcome.EMIT);
        Assert.assertEquals(1L, streamingAggBatch.getRecordCount());
        build.clear();
        build2.clear();
    }
}
