/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.fetch;

import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchPOStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchProgressableReporter;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchTaskContext;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.joda.time.DateTimeZone;

public class FetchLauncher {
    private final PigContext pigContext;
    private final Configuration conf;

    public FetchLauncher(PigContext pigContext) {
        this.pigContext = pigContext;
        this.conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PigStats launchPig(PhysicalPlan pp) throws IOException {
        try {
            POStore poStore = (POStore)pp.getLeaves().get(0);
            this.init(pp, poStore);
            this.runPipeline(poStore);
            UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp, (PlanWalker<PhysicalOperator, PhysicalPlan>)new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
            udfFinisher.visit();
            PigStats pigStats = PigStats.start(new EmptyPigStats(this.pigContext, poStore));
            return pigStats;
        }
        finally {
            UDFContext.getUDFContext().addJobConf(null);
        }
    }

    public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps, String format) throws PlanException, VisitorException, IOException {
        if ("xml".equals(format)) {
            ps.println("<mapReducePlan>No MR jobs. Fetch only</mapReducePlan>");
        } else {
            ps.println("#--------------------------------------------------");
            ps.println("# Map Reduce Plan                                  ");
            ps.println("#--------------------------------------------------");
            ps.println("No MR jobs. Fetch only.");
        }
    }

    private void init(PhysicalPlan pp, POStore poStore) throws IOException {
        poStore.setStoreImpl(new FetchPOStoreImpl(this.pigContext));
        poStore.setUp();
        TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
        HadoopShims.setTaskAttemptId(this.conf, taskAttemptID);
        if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
            MapRedUtil.setupStreamingDirsConfSingle(poStore, this.pigContext, this.conf);
        }
        String currentTime = Long.toString(System.currentTimeMillis());
        this.conf.set("pig.script.submitted.timestamp", currentTime);
        this.conf.set("pig.job.submitted.timestamp", currentTime);
        PhysicalOperator.setReporter(new FetchProgressableReporter());
        SchemaTupleBackend.initialize(this.conf, this.pigContext);
        UDFContext udfContext = UDFContext.getUDFContext();
        udfContext.addJobConf(this.conf);
        udfContext.setClientSystemProps(this.pigContext.getProperties());
        udfContext.serialize(this.conf);
        PigMapReduce.sJobConfInternal.set(this.conf);
        String dtzStr = this.conf.get("pig.datetime.default.tz");
        if (dtzStr != null && dtzStr.length() > 0) {
            DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
        }
        boolean aggregateWarning = "true".equalsIgnoreCase(this.conf.get("aggregate.warning"));
        PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
        pigStatusReporter.setContext(new FetchTaskContext(new FetchContext()));
        PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
        pigHadoopLogger.setReporter(pigStatusReporter);
        pigHadoopLogger.setAggregate(aggregateWarning);
        PhysicalOperator.setPigLogger(pigHadoopLogger);
    }

    private void runPipeline(POStore posStore) throws IOException {
        Result res;
        while (true) {
            res = posStore.getNextTuple();
            if (res.returnStatus == 0) continue;
            if (res.returnStatus == 3) {
                posStore.tearDown();
                return;
            }
            if (res.returnStatus != 1 && res.returnStatus == 2) break;
        }
        String errMsg = res.result != null ? "Fetch failed. Couldn't retrieve result: " + res.result : "Fetch failed. Couldn't retrieve result";
        int errCode = 2088;
        ExecException ee = new ExecException(errMsg, errCode, 4);
        throw ee;
    }
}

