/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.TerminatingAccumulator;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class IteratingAccumulatorEvalFunc<T>
extends AccumulatorEvalFunc<T>
implements TerminatingAccumulator<T> {
    private boolean isInitialized = false;
    private DelayedQueueIterator dqi;
    private BlockingQueue<Tuple> queue;
    private volatile boolean isFinished = false;
    private volatile boolean noMoreValues = false;
    private volatile boolean exceptionThrown = false;
    private T returnValue;
    private Thread executionThread;
    private Exception executionThreadException;
    private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
    private static final long WAIT_TO_OFFER = 500L;
    private static final long WAIT_TO_POLL = 500L;
    private static final long WAIT_TO_JOIN = 500L;

    private void initialize() {
        this.dqi = new DelayedQueueIterator();
        this.queue = new LinkedBlockingQueue<Tuple>(10000);
        this.executionThread = new Thread(new Runnable(){

            public void run() {
                try {
                    IteratingAccumulatorEvalFunc.this.returnValue = IteratingAccumulatorEvalFunc.this.exec(IteratingAccumulatorEvalFunc.this.dqi);
                    IteratingAccumulatorEvalFunc.this.isFinished = true;
                }
                catch (Exception e) {
                    IteratingAccumulatorEvalFunc.this.executionThreadException = e;
                    IteratingAccumulatorEvalFunc.this.exceptionThrown = true;
                }
            }
        });
        this.executionThread.start();
        this.isInitialized = true;
    }

    @Override
    public boolean isFinished() {
        return this.isFinished;
    }

    @Override
    public void accumulate(Tuple input) throws IOException {
        if (!this.isInitialized) {
            this.initialize();
        }
        for (Tuple t : (DataBag)input.get(0)) {
            if (this.isFinished) {
                return;
            }
            boolean added = false;
            while (!(this.isFinished || added || this.exceptionThrown)) {
                try {
                    added = this.queue.offer(t, 500L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {}
            }
            if (!this.exceptionThrown) continue;
            throw new RuntimeException("Exception thrown in thread: ", this.executionThreadException);
        }
    }

    @Override
    public T getValue() {
        this.noMoreValues = true;
        do {
            if (this.exceptionThrown) {
                throw new RuntimeException("Exception thrown in thread: ", this.executionThreadException);
            }
            try {
                this.executionThread.join(500L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (this.executionThread.isAlive());
        return this.returnValue;
    }

    @Override
    public void cleanup() {
        this.returnValue = null;
        this.dqi = null;
        this.queue = null;
        this.isFinished = false;
        this.noMoreValues = false;
        this.executionThread = null;
        this.isInitialized = false;
    }

    public abstract T exec(Iterator<Tuple> var1) throws IOException;

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class DelayedQueueIterator
    implements Iterator<Tuple> {
        private Tuple next;

        private DelayedQueueIterator() {
        }

        @Override
        public boolean hasNext() {
            if (this.next != null) {
                return true;
            }
            while (!IteratingAccumulatorEvalFunc.this.noMoreValues) {
                try {
                    this.next = (Tuple)IteratingAccumulatorEvalFunc.this.queue.poll(500L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                if (this.next == null) continue;
                return true;
            }
            this.next = (Tuple)IteratingAccumulatorEvalFunc.this.queue.poll();
            return this.next != null;
        }

        @Override
        public Tuple next() {
            Tuple t = this.next;
            if (t == null) {
                throw new RuntimeException("Entered inconsistent state!");
            }
            this.next = null;
            return t;
        }

        @Override
        public void remove() {
        }
    }
}

