/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.pipes;

import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapred.pipes.Application;
import org.apache.hadoop.mapred.pipes.DownwardProtocol;
import org.apache.hadoop.mapred.pipes.Submitter;

class PipesReducer<K2 extends WritableComparable, V2 extends Writable, K3 extends WritableComparable, V3 extends Writable>
implements Reducer<K2, V2, K3, V3> {
    private static final Log LOG = LogFactory.getLog(PipesReducer.class.getName());
    private JobConf job;
    private Application<K2, V2, K3, V3> application = null;
    private DownwardProtocol<K2, V2> downlink = null;
    private boolean isOk = true;
    private boolean skipping = false;

    PipesReducer() {
    }

    @Override
    public void configure(JobConf job) {
        this.job = job;
        SkipBadRecords.setAutoIncrReducerProcCount(job, false);
        this.skipping = job.getBoolean("mapreduce.job.skiprecords", false);
    }

    @Override
    public void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
        this.isOk = false;
        this.startApplication(output, reporter);
        this.downlink.reduceKey(key);
        while (values.hasNext()) {
            this.downlink.reduceValue((Writable)values.next());
        }
        if (this.skipping) {
            this.downlink.flush();
        }
        this.isOk = true;
    }

    private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
        if (this.application == null) {
            try {
                LOG.info("starting application");
                this.application = new Application(this.job, null, output, reporter, this.job.getOutputKeyClass(), this.job.getOutputValueClass());
                this.downlink = this.application.getDownlink();
            }
            catch (InterruptedException ie) {
                throw new RuntimeException("interrupted", ie);
            }
            int reduce2 = 0;
            this.downlink.runReduce(reduce2, Submitter.getIsJavaRecordWriter(this.job));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        if (this.isOk) {
            OutputCollector nullCollector = new OutputCollector<K3, V3>(){

                @Override
                public void collect(K3 key, V3 value2) throws IOException {
                }
            };
            this.startApplication(nullCollector, Reporter.NULL);
        }
        try {
            if (this.isOk) {
                this.application.getDownlink().endOfInput();
            } else {
                this.application.getDownlink().abort();
            }
            LOG.info("waiting for finish");
            this.application.waitForFinish();
            LOG.info("got done");
        }
        catch (Throwable t) {
            this.application.abort(t);
        }
        finally {
            this.application.cleanup();
        }
    }
}

