/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SelfSpillBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Spillable;
import org.apache.pig.impl.util.SpillableMemoryManager;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class POPartialAgg
extends PhysicalOperator
implements Spillable {
    private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
    private static final long serialVersionUID = 1L;
    private static final Result ERR_RESULT = new Result();
    private static final Result EOP_RESULT = new Result(3, null);
    private static final int NUM_RECS_TO_SAMPLE = 10000;
    private static final int MAX_LIST_SIZE = 9368;
    private static final int DEFAULT_MIN_REDUCTION = 10;
    private static final int FIRST_TIER_THRESHOLD = 20000;
    private static final int SECOND_TIER_THRESHOLD = 2000;
    private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap();
    private static final TupleFactory TF = TupleFactory.getInstance();
    private static final BagFactory BG = BagFactory.getInstance();
    private PhysicalPlan keyPlan;
    private ExpressionOperator keyLeaf;
    private List<PhysicalPlan> valuePlans;
    private List<ExpressionOperator> valueLeaves;
    private int numRecsInRawMap = 0;
    private int numRecsInProcessedMap = 0;
    private Map<Object, List<Tuple>> rawInputMap = Maps.newHashMap();
    private Map<Object, List<Tuple>> processedInputMap = Maps.newHashMap();
    private boolean disableMapAgg = false;
    private boolean sizeReductionChecked = false;
    private boolean inputsExhausted = false;
    private volatile boolean doSpill = false;
    private transient SelfSpillBag.MemoryLimits memLimits;
    private transient boolean initialized = false;
    private int firstTierThreshold = 20000;
    private int secondTierThreshold = 2000;
    private int sizeReduction = 1;
    private int avgTupleSize = 0;
    private Iterator<Map.Entry<Object, List<Tuple>>> spillingIterator;
    private boolean estimatedMemThresholds = false;

    public POPartialAgg(OperatorKey k) {
        super(k);
    }

    private void init() throws ExecException {
        ALL_POPARTS.put(this, null);
        float percent = this.getPercentUsageFromProp();
        if (percent <= 0.0f) {
            LOG.info((Object)"No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
            this.disableMapAgg();
        }
        this.initialized = true;
        SpillableMemoryManager.getInstance().registerSpillable(this);
    }

    @Override
    public Result getNextTuple() throws ExecException {
        if (!this.initialized && !ALL_POPARTS.containsKey(this)) {
            this.init();
        }
        while (true) {
            if (!this.sizeReductionChecked && this.numRecsInRawMap >= 10000) {
                this.checkSizeReduction();
            }
            if (!this.estimatedMemThresholds && this.numRecsInRawMap >= 10000) {
                this.estimateMemThresholds();
            }
            if (this.doSpill) {
                this.startSpill();
                Result result = this.spillResult();
                if (result == EOP_RESULT) {
                    this.doSpill = false;
                }
                if (result != EOP_RESULT || this.inputsExhausted) {
                    return result;
                }
            }
            if (this.mapAggDisabled()) {
                return this.processInput();
            }
            Result inp = this.processInput();
            if (inp.returnStatus == 2) {
                return inp;
            }
            if (inp.returnStatus == 3) {
                if (this.parentPlan.endOfAllInput) {
                    this.inputsExhausted = true;
                    this.startSpill();
                    LOG.info((Object)"Spilling last bits.");
                    continue;
                }
                return EOP_RESULT;
            }
            if (inp.returnStatus == 1) continue;
            Tuple inpTuple = (Tuple)inp.result;
            this.keyPlan.attachInput(inpTuple);
            Result keyRes = this.getResult(this.keyLeaf);
            if (keyRes == ERR_RESULT) {
                return ERR_RESULT;
            }
            Object key = keyRes.result;
            this.keyPlan.detachInput();
            ++this.numRecsInRawMap;
            this.addKeyValToMap(this.rawInputMap, key, inpTuple);
            if (this.shouldAggregateFirstLevel()) {
                this.aggregateFirstLevel();
            }
            if (this.shouldAggregateSecondLevel()) {
                this.aggregateSecondLevel();
            }
            if (!this.shouldSpill()) continue;
            LOG.info((Object)"Starting spill.");
            this.startSpill();
        }
    }

    private void estimateMemThresholds() {
        if (!this.mapAggDisabled()) {
            LOG.info((Object)("Getting mem limits; considering " + ALL_POPARTS.size() + " POPArtialAgg objects."));
            float percent = this.getPercentUsageFromProp();
            this.memLimits = new SelfSpillBag.MemoryLimits(ALL_POPARTS.size(), percent);
            int estTotalMem = 0;
            int estTuples = 0;
            for (Map.Entry<Object, List<Tuple>> entry : this.rawInputMap.entrySet()) {
                for (Tuple t : entry.getValue()) {
                    ++estTuples;
                    int mem = (int)t.getMemorySize();
                    estTotalMem += mem;
                    this.memLimits.addNewObjSize(mem);
                }
            }
            this.avgTupleSize = estTotalMem / estTuples;
            int totalTuples = this.memLimits.getCacheLimit();
            LOG.info((Object)("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples));
            this.firstTierThreshold = (int)(0.5 + (double)((float)totalTuples * (1.0f - 1.0f / (float)this.sizeReduction)));
            this.secondTierThreshold = (int)(0.5 + (double)((float)totalTuples * (1.0f / (float)this.sizeReduction)));
            LOG.info((Object)("Setting thresholds. Primary: " + this.firstTierThreshold + ". Secondary: " + this.secondTierThreshold));
        }
        this.estimatedMemThresholds = true;
    }

    private void checkSizeReduction() throws ExecException {
        int numBeforeReduction = this.numRecsInProcessedMap + this.numRecsInRawMap;
        this.aggregateFirstLevel();
        this.aggregateSecondLevel();
        int numAfterReduction = this.numRecsInProcessedMap + this.numRecsInRawMap;
        LOG.info((Object)("After reduction, processed map: " + this.numRecsInProcessedMap + "; raw map: " + this.numRecsInRawMap));
        int minReduction = this.getMinOutputReductionFromProp();
        LOG.info((Object)("Observed reduction factor: from " + numBeforeReduction + " to " + numAfterReduction + " => " + numBeforeReduction / numAfterReduction + "."));
        if (numBeforeReduction / numAfterReduction < minReduction) {
            LOG.info((Object)("Disabling in-memory aggregation, since observed reduction is less than " + minReduction));
            this.disableMapAgg();
        }
        this.sizeReduction = numBeforeReduction / numAfterReduction;
        this.sizeReductionChecked = true;
    }

    private void disableMapAgg() throws ExecException {
        this.startSpill();
        this.disableMapAgg = true;
    }

    private boolean mapAggDisabled() {
        return this.disableMapAgg;
    }

    private boolean shouldAggregateFirstLevel() {
        if (LOG.isInfoEnabled() && this.numRecsInRawMap > this.firstTierThreshold) {
            LOG.info((Object)("Aggregating " + this.numRecsInRawMap + " raw records."));
        }
        return this.numRecsInRawMap > this.firstTierThreshold;
    }

    private boolean shouldAggregateSecondLevel() {
        if (LOG.isInfoEnabled() && this.numRecsInProcessedMap > this.secondTierThreshold) {
            LOG.info((Object)("Aggregating " + this.numRecsInProcessedMap + " secondary records."));
        }
        return this.numRecsInProcessedMap > this.secondTierThreshold;
    }

    private boolean shouldSpill() {
        return this.shouldAggregateSecondLevel();
    }

    private void addKeyValToMap(Map<Object, List<Tuple>> map, Object key, Tuple inpTuple) throws ExecException {
        List<Tuple> value = map.get(key);
        if (value == null) {
            value = new ArrayList<Tuple>();
            map.put(key, value);
        }
        value.add(inpTuple);
        if (value.size() >= 9368) {
            boolean isFirst;
            boolean bl = isFirst = map == this.rawInputMap;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The cache for key " + key + " has grown too large. Aggregating " + (isFirst ? "first level." : "second level.")));
            }
            if (isFirst) {
                this.aggregateRawRow(key);
            } else {
                this.aggregateSecondLevel();
            }
        }
    }

    private void startSpill() throws ExecException {
        if (this.spillingIterator != null) {
            return;
        }
        if (!this.rawInputMap.isEmpty()) {
            if (LOG.isInfoEnabled()) {
                LOG.info((Object)("In startSpill(), aggregating raw inputs. " + this.numRecsInRawMap + " tuples."));
            }
            this.aggregateFirstLevel();
            if (LOG.isInfoEnabled()) {
                LOG.info((Object)("processed inputs: " + this.numRecsInProcessedMap + " tuples."));
            }
        }
        if (!this.processedInputMap.isEmpty()) {
            if (LOG.isInfoEnabled()) {
                LOG.info((Object)("In startSpill(), aggregating processed inputs. " + this.numRecsInProcessedMap + " tuples."));
            }
            this.aggregateSecondLevel();
            if (LOG.isInfoEnabled()) {
                LOG.info((Object)("processed inputs: " + this.numRecsInProcessedMap + " tuples."));
            }
        }
        this.doSpill = true;
        this.spillingIterator = this.processedInputMap.entrySet().iterator();
    }

    private Result spillResult() throws ExecException {
        if (this.processedInputMap.isEmpty()) {
            this.spillingIterator = null;
            LOG.info((Object)"In spillResults(), processed map is empty -- done spilling.");
            return EOP_RESULT;
        }
        Map.Entry<Object, List<Tuple>> entry = this.spillingIterator.next();
        Tuple valueTuple = this.createValueTuple(entry.getKey(), entry.getValue());
        this.numRecsInProcessedMap -= entry.getValue().size();
        this.spillingIterator.remove();
        Result res = this.getOutput(entry.getKey(), valueTuple);
        return res;
    }

    private void aggregateRawRow(Object key) throws ExecException {
        List<Tuple> value = this.rawInputMap.get(key);
        Tuple valueTuple = this.createValueTuple(key, value);
        Result res = this.getOutput(key, valueTuple);
        this.rawInputMap.remove(key);
        this.addKeyValToMap(this.processedInputMap, key, this.getAggResultTuple(res.result));
        this.numRecsInProcessedMap += valueTuple.size() - 1;
    }

    private int aggregate(Map<Object, List<Tuple>> fromMap, Map<Object, List<Tuple>> toMap, int numEntriesInTarget) throws ExecException {
        Iterator<Map.Entry<Object, List<Tuple>>> iter = fromMap.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<Object, List<Tuple>> entry = iter.next();
            Tuple valueTuple = this.createValueTuple(entry.getKey(), entry.getValue());
            Result res = this.getOutput(entry.getKey(), valueTuple);
            iter.remove();
            this.addKeyValToMap(toMap, entry.getKey(), this.getAggResultTuple(res.result));
            numEntriesInTarget += valueTuple.size() - 1;
        }
        return numEntriesInTarget;
    }

    private void aggregateFirstLevel() throws ExecException {
        this.numRecsInProcessedMap = this.aggregate(this.rawInputMap, this.processedInputMap, this.numRecsInProcessedMap);
        this.numRecsInRawMap = 0;
    }

    private void aggregateSecondLevel() throws ExecException {
        HashMap<Object, List<Tuple>> newMap = Maps.newHashMapWithExpectedSize(this.processedInputMap.size());
        this.numRecsInProcessedMap = this.aggregate(this.processedInputMap, newMap, 0);
        this.processedInputMap = newMap;
    }

    private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException {
        Tuple valueTuple = TF.newTuple(this.valuePlans.size() + 1);
        valueTuple.set(0, key);
        for (int i = 0; i < this.valuePlans.size(); ++i) {
            DataBag bag = BG.newDefaultBag();
            valueTuple.set(i + 1, bag);
        }
        for (Tuple t : inpTuples) {
            for (int i = 1; i < t.size(); ++i) {
                DataBag bag = (DataBag)valueTuple.get(i);
                bag.add((Tuple)t.get(i));
            }
        }
        return valueTuple;
    }

    private Tuple getAggResultTuple(Object result) throws ExecException {
        try {
            return (Tuple)result;
        }
        catch (ClassCastException ex) {
            throw new ExecException("Intermediate Algebraic functions must implement EvalFunc<Tuple>");
        }
    }

    @Override
    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void visit(PhyPlanVisitor v) throws VisitorException {
        v.visitPartialAgg(this);
    }

    private int getMinOutputReductionFromProp() {
        int minReduction = ((Configuration)PigMapReduce.sJobConfInternal.get()).getInt("pig.exec.mapPartAgg.minReduction", 10);
        if (minReduction <= 0) {
            LOG.info((Object)("Specified reduction is < 0 (" + minReduction + "). Using default " + 10));
            minReduction = 10;
        }
        return minReduction;
    }

    private float getPercentUsageFromProp() {
        String usage;
        float percent = 0.2f;
        if (PigMapReduce.sJobConfInternal.get() != null && (usage = ((Configuration)PigMapReduce.sJobConfInternal.get()).get("pig.cachedbag.memusage")) != null) {
            percent = Float.parseFloat(usage);
        }
        return percent;
    }

    private Result getResult(ExpressionOperator op) throws ExecException {
        Result res = ERR_RESULT;
        switch (op.getResultType()) {
            case 5: 
            case 10: 
            case 15: 
            case 20: 
            case 25: 
            case 30: 
            case 50: 
            case 55: 
            case 65: 
            case 70: 
            case 100: 
            case 110: 
            case 120: {
                res = op.getNext(op.getResultType());
                break;
            }
            default: {
                String msg = "Invalid result type: " + DataType.findType(op.getResultType());
                throw new ExecException(msg, 2270, 4);
            }
        }
        if (res.returnStatus == 0 || res.returnStatus == 1) {
            return res;
        }
        return ERR_RESULT;
    }

    private Result getOutput(Object key, Tuple value) throws ExecException {
        Tuple output = TF.newTuple(this.valuePlans.size() + 1);
        output.set(0, key);
        for (int i = 0; i < this.valuePlans.size(); ++i) {
            this.valuePlans.get(i).attachInput(value);
            Result valRes = this.getResult(this.valueLeaves.get(i));
            if (valRes == ERR_RESULT) {
                return ERR_RESULT;
            }
            output.set(i + 1, valRes.result);
        }
        return new Result(0, output);
    }

    @Override
    public boolean supportsMultipleInputs() {
        return false;
    }

    @Override
    public boolean supportsMultipleOutputs() {
        return false;
    }

    @Override
    public String name() {
        return this.getAliasString() + "Partial Agg" + "[" + DataType.findTypeName(this.resultType) + "]" + this.mKey.toString();
    }

    public PhysicalPlan getKeyPlan() {
        return this.keyPlan;
    }

    public void setKeyPlan(PhysicalPlan keyPlan) {
        this.keyPlan = keyPlan;
        this.keyLeaf = (ExpressionOperator)keyPlan.getLeaves().get(0);
    }

    public List<PhysicalPlan> getValuePlans() {
        return this.valuePlans;
    }

    public void setValuePlans(List<PhysicalPlan> valuePlans) {
        this.valuePlans = valuePlans;
        this.valueLeaves = new ArrayList<ExpressionOperator>();
        for (PhysicalPlan plan : valuePlans) {
            this.valueLeaves.add((ExpressionOperator)plan.getLeaves().get(0));
        }
    }

    @Override
    public long spill() {
        LOG.info((Object)"Spill triggered by SpillableMemoryManager");
        this.doSpill = true;
        return 0L;
    }

    @Override
    public long getMemorySize() {
        return this.avgTupleSize * (this.numRecsInProcessedMap + this.numRecsInRawMap);
    }
}

