/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.pen.util.ExampleTuple;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class POStream
extends PhysicalOperator {
    private static final long serialVersionUID = 2L;
    private static final Result EOP_RESULT = new Result(3, null);
    private String executableManagerStr;
    private transient ExecutableManager executableManager;
    private StreamingCommand command;
    private Properties properties;
    private boolean initialized = false;
    protected BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
    protected BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
    protected boolean allInputFromPredecessorConsumed = false;
    protected boolean allOutputFromBinaryProcessed = false;

    public POStream(OperatorKey k, ExecutableManager executableManager, StreamingCommand command, Properties properties) {
        super(k);
        this.executableManagerStr = executableManager.getClass().getName();
        this.command = command;
        this.properties = properties;
        if (command.getShipFiles()) {
            POStream.parseShipCacheSpecs(command.getShipSpecs(), properties, "pig.streaming.ship.files");
        }
        POStream.parseShipCacheSpecs(command.getCacheSpecs(), properties, "pig.streaming.cache.files");
    }

    private static void parseShipCacheSpecs(List<String> specs, Properties properties, String property) {
        String existingValue = properties.getProperty(property, "");
        if (specs == null || specs.size() == 0) {
            return;
        }
        StringBuffer sb = new StringBuffer();
        Iterator<String> i = specs.iterator();
        if (!existingValue.equals("")) {
            sb.append(existingValue);
            if (i.hasNext()) {
                sb.append(", ");
            }
        }
        while (i.hasNext()) {
            sb.append(i.next());
            if (!i.hasNext()) continue;
            sb.append(", ");
        }
        properties.setProperty(property, sb.toString());
    }

    public Properties getShipCacheProperties() {
        return this.properties;
    }

    public StreamingCommand getCommand() {
        return this.command;
    }

    @Override
    public Result getNextTuple() throws ExecException {
        try {
            if (this.allOutputFromBinaryProcessed) {
                return new Result(3, null);
            }
            if (this.allInputFromPredecessorConsumed) {
                Result r = this.binaryOutputQueue.take();
                if (r.returnStatus == 4) {
                    r = EOP_RESULT;
                } else if (r.returnStatus == 0) {
                    this.illustratorMarkup(r.result, r.result, 0);
                }
                return r;
            }
            if (this.parentPlan.endOfAllInput) {
                Result r = this.getNextHelper(null);
                if (r.returnStatus == 3) {
                    if (this.getInitialized()) {
                        this.binaryInputQueue.put(r);
                        this.allInputFromPredecessorConsumed = true;
                        r = this.binaryOutputQueue.take();
                        if (r.returnStatus == 4) {
                            r = EOP_RESULT;
                        }
                    }
                } else if (r.returnStatus == 4) {
                    r = EOP_RESULT;
                } else if (r.returnStatus == 0) {
                    this.illustratorMarkup(r.result, r.result, 0);
                }
                return r;
            }
            Result r = this.getNextHelper(null);
            if (r.returnStatus == 4) {
                r = EOP_RESULT;
                this.allOutputFromBinaryProcessed = true;
            } else if (r.returnStatus == 0) {
                this.illustratorMarkup(r.result, r.result, 0);
            }
            return r;
        }
        catch (Exception e) {
            int errCode = 2083;
            String msg = "Error while trying to get next result in POStream.";
            throw new ExecException(msg, errCode, 4, e);
        }
    }

    public synchronized boolean getInitialized() {
        return this.initialized;
    }

    public synchronized void setInitialized(boolean initialized) {
        this.initialized = initialized;
    }

    /*
     * Unable to fully structure code
     */
    public Result getNextHelper(Tuple t) throws ExecException {
        try {
            var2_2 = this;
            synchronized (var2_2) {
                block7: while (true) {
                    if (!this.binaryOutputQueue.isEmpty()) {
                        res = this.binaryOutputQueue.take();
                        return res;
                    }
                    if (this.binaryInputQueue.remainingCapacity() > 0) {
                        input = this.processInput();
                        if (input.returnStatus == 3 || input.returnStatus == 2) {
                            return input;
                        }
                        if (!this.initialized) {
                            this.executableManager = (ExecutableManager)PigContext.instantiateFuncFromSpec(this.executableManagerStr);
                            try {
                                this.executableManager.configure(this);
                                this.executableManager.run();
                            }
                            catch (IOException ioe) {
                                errCode = 2084;
                                msg = "Error while running streaming binary.";
                                throw new ExecException(msg, errCode, 4, ioe);
                            }
                            this.initialized = true;
                        }
                        this.binaryInputQueue.put(input);
                        continue;
                    }
                    while (true) {
                        if (this.binaryOutputQueue.isEmpty() && !this.binaryInputQueue.isEmpty()) ** break;
                        continue block7;
                        this.wait();
                    }
                    break;
                }
            }
        }
        catch (Exception e) {
            errCode = 2083;
            msg = "Error while trying to get next result in POStream.";
            throw new ExecException(msg, errCode, 4, e);
        }
    }

    @Override
    public String toString() {
        return this.getAliasString() + "POStream" + "[" + this.command.toString() + "]" + " - " + this.mKey.toString();
    }

    @Override
    public void visit(PhyPlanVisitor v) throws VisitorException {
        v.visitStream(this);
    }

    @Override
    public String name() {
        return this.toString();
    }

    @Override
    public boolean supportsMultipleInputs() {
        return false;
    }

    @Override
    public boolean supportsMultipleOutputs() {
        return false;
    }

    public void finish() throws IOException {
        this.executableManager.close();
    }

    public BlockingQueue<Result> getBinaryInputQueue() {
        return this.binaryInputQueue;
    }

    public BlockingQueue<Result> getBinaryOutputQueue() {
        return this.binaryOutputQueue;
    }

    @Override
    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
        if (this.illustrator != null) {
            ExampleTuple tIn = (ExampleTuple)in;
            this.illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
            this.illustrator.addData((Tuple)out);
        }
        return (Tuple)out;
    }
}

