/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.orderedpartitioner;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.math.stat.descriptive.moment.Mean;
import org.apache.commons.math.stat.descriptive.moment.StandardDeviation;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.IntVector;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore(value="Disabled until alternative to distributed cache provided.")
public class TestOrderedPartitionExchange
extends PopUnitTestBase {
    static final Logger logger = LoggerFactory.getLogger(TestOrderedPartitionExchange.class);

    @Test
    public void twoBitTwoExchangeRun() throws Exception {
        RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
        try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
             Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
             DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
            bit1.run();
            bit2.run();
            client.connect();
            List results = client.runQuery(UserBitShared.QueryType.PHYSICAL, Files.toString((File)FileUtils.getResourceAsFile((String)"/sender/ordered_exchange.json"), (Charset)Charsets.UTF_8));
            int count = 0;
            ArrayList partitionRecordCounts = Lists.newArrayList();
            for (QueryDataBatch b : results) {
                if (b.getData() != null) {
                    int rows = b.getHeader().getRowCount();
                    count += rows;
                    RecordBatchLoader loader = new RecordBatchLoader(new BootStrapContext(DrillConfig.create()).getAllocator());
                    loader.load(b.getHeader().getDef(), b.getData());
                    BigIntVector vv1 = (BigIntVector)loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("col1", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
                    Float8Vector vv2 = (Float8Vector)loader.getValueAccessorById(Float8Vector.class, loader.getValueVectorId(new SchemaPath("col2", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
                    IntVector pVector = (IntVector)loader.getValueAccessorById(IntVector.class, loader.getValueVectorId(new SchemaPath("partition", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
                    long previous1 = Long.MIN_VALUE;
                    double previous2 = Double.MIN_VALUE;
                    int partPrevious = -1;
                    long current1 = Long.MIN_VALUE;
                    double current2 = Double.MIN_VALUE;
                    int partCurrent = -1;
                    int partitionRecordCount = 0;
                    for (int i = 0; i < rows; ++i) {
                        previous1 = current1;
                        previous2 = current2;
                        partPrevious = partCurrent;
                        current1 = vv1.getAccessor().get(i);
                        current2 = vv2.getAccessor().get(i);
                        partCurrent = pVector.getAccessor().get(i);
                        Assert.assertTrue((current1 >= previous1 ? 1 : 0) != 0);
                        if (current1 == previous1) {
                            Assert.assertTrue((current2 <= previous2 ? 1 : 0) != 0);
                        }
                        if (partCurrent == partPrevious || partPrevious == -1) {
                            ++partitionRecordCount;
                            continue;
                        }
                        partitionRecordCounts.add(partitionRecordCount);
                        partitionRecordCount = 0;
                    }
                    partitionRecordCounts.add(partitionRecordCount);
                    loader.clear();
                }
                b.release();
            }
            double[] values = new double[partitionRecordCounts.size()];
            int i = 0;
            for (Integer rc : partitionRecordCounts) {
                values[i++] = rc.doubleValue();
            }
            StandardDeviation stdDev = new StandardDeviation();
            Mean mean = new Mean();
            double std = stdDev.evaluate(values);
            double m = mean.evaluate(values);
            System.out.println("mean: " + m + " std dev: " + std);
            Assert.assertEquals((long)31000L, (long)count);
        }
    }
}

