package org.apache.pig.impl.streaming;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.streaming.InputHandler;
import org.apache.pig.impl.streaming.OutputHandler;
import org.apache.pig.impl.util.UDFContext;

/* loaded from: input_file:org/apache/pig/impl/streaming/ExecutableManager.class */
public class ExecutableManager {
    private static final int SUCCESS = 0;
    protected StreamingCommand command;
    Process process;
    protected DataOutputStream stdin;
    ProcessInputThread stdinThread;
    ProcessOutputThread stdoutThread;
    InputStream stdout;
    ProcessErrorThread stderrThread;
    InputStream stderr;
    InputHandler inputHandler;
    OutputHandler outputHandler;
    protected volatile Throwable outerrThreadsError;
    private POStream poStream;
    private ProcessInputThread fileInputThread;
    private static final Log LOG = LogFactory.getLog(ExecutableManager.class);
    private static final Result EOS_RESULT = new Result((byte) 4, null);
    protected int exitCode = -127;
    protected long inputRecords = 0;
    protected long inputBytes = 0;
    protected long outputRecords = 0;
    protected long outputBytes = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pig/impl/streaming/ExecutableManager$ProcessErrorThread.class */
    public class ProcessErrorThread extends Thread {
        public ProcessErrorThread() {
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(ExecutableManager.this.stderr));
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    } else {
                        ExecutableManager.this.processError(readLine + "\n");
                    }
                }
                if (ExecutableManager.this.stderr != null) {
                    ExecutableManager.this.stderr.close();
                    ExecutableManager.LOG.debug("ProcessErrorThread done");
                }
            } catch (Throwable th) {
                ExecutableManager.this.outerrThreadsError = th;
                ExecutableManager.LOG.error(th);
                try {
                    if (ExecutableManager.this.stderr != null) {
                        ExecutableManager.this.stderr.close();
                    }
                } catch (IOException e) {
                    ExecutableManager.LOG.warn(e);
                }
                throw new RuntimeException(th);
            }
        }
    }

    /* loaded from: input_file:org/apache/pig/impl/streaming/ExecutableManager$ProcessInputThread.class */
    class ProcessInputThread extends Thread {
        InputHandler inputHandler;
        private POStream poStream;
        private UDFContext udfContext;
        private BlockingQueue<Result> binaryInputQueue;

        ProcessInputThread(InputHandler inputHandler, POStream pOStream, UDFContext uDFContext) {
            setDaemon(true);
            this.inputHandler = inputHandler;
            this.poStream = pOStream;
            this.udfContext = uDFContext;
            this.binaryInputQueue = pOStream.getBinaryInputQueue();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            if (this.inputHandler.getInputType() == InputHandler.InputType.ASYNCHRONOUS && this.udfContext != null) {
                UDFContext.setUdfContext(this.udfContext);
            }
            while (true) {
                try {
                    Result take = this.binaryInputQueue.take();
                    synchronized (this.poStream) {
                        if (take != null) {
                            this.poStream.notifyAll();
                        }
                    }
                    if (take != null && take.returnStatus == 3) {
                        ExecutableManager.this.close();
                        return;
                    }
                    if (take != null && take.returnStatus == 0) {
                        if (ExecutableManager.this.outerrThreadsError != null) {
                            throw new IOException("Output/Error thread failed with: " + ExecutableManager.this.outerrThreadsError);
                        }
                        try {
                            Tuple tuple = (Tuple) take.result;
                            this.inputHandler.putNext(tuple);
                            ExecutableManager.this.inputBytes += tuple.getMemorySize();
                            ExecutableManager.this.inputRecords++;
                        } catch (IOException e) {
                            if (this.inputHandler.getInputType() == InputHandler.InputType.SYNCHRONOUS) {
                                ExecutableManager.LOG.warn("Exception while trying to write to stream binary's input", e);
                                ExecutableManager.this.close();
                                return;
                            } else {
                                ExecutableManager.LOG.error("Exception while trying to write to stream binary's input", e);
                                ExecutableManager.this.sendOutput(this.poStream.getBinaryOutputQueue(), new Result((byte) 2, "Exception while trying to write to stream binary's input" + e.getMessage()));
                                throw e;
                            }
                        }
                    }
                } catch (Throwable th) {
                    ExecutableManager.this.outerrThreadsError = th;
                    ExecutableManager.LOG.error("Error while reading from POStream and passing it to the streaming process", th);
                    try {
                        ExecutableManager.this.killProcess(ExecutableManager.this.process);
                        return;
                    } catch (IOException e2) {
                        ExecutableManager.LOG.warn(e2);
                        return;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pig/impl/streaming/ExecutableManager$ProcessOutputThread.class */
    public class ProcessOutputThread extends Thread {
        OutputHandler outputHandler;
        private BlockingQueue<Result> binaryOutputQueue;

        ProcessOutputThread(OutputHandler outputHandler, POStream pOStream) {
            setDaemon(true);
            this.outputHandler = outputHandler;
            this.binaryOutputQueue = pOStream.getBinaryOutputQueue();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Tuple next = this.outputHandler.getNext();
                    if (next == null) {
                        processOutput(null);
                        this.outputHandler.close();
                        return;
                    } else {
                        processOutput(next);
                        ExecutableManager.this.outputBytes += next.getMemorySize();
                    }
                } catch (Throwable th) {
                    ExecutableManager.this.outerrThreadsError = th;
                    ExecutableManager.LOG.error("Caught Exception in OutputHandler of Streaming binary, sending error signal to pipeline", th);
                    try {
                        Result result = new Result();
                        result.result = "Error reading output from Streaming binary:'" + ExecutableManager.this.command.toString() + "':" + th.getMessage();
                        result.returnStatus = (byte) 2;
                        ExecutableManager.this.sendOutput(this.binaryOutputQueue, result);
                        ExecutableManager.this.killProcess(ExecutableManager.this.process);
                        return;
                    } catch (Exception e) {
                        ExecutableManager.LOG.error("Error while trying to signal Error status to pipeline", e);
                        return;
                    }
                }
            }
        }

        void processOutput(Tuple tuple) {
            Result result = new Result();
            if (tuple != null) {
                result.result = tuple;
                result.returnStatus = (byte) 0;
                ExecutableManager.this.outputRecords++;
            } else {
                try {
                    ExecutableManager.this.exitCode = ExecutableManager.this.process.waitFor();
                    if (ExecutableManager.this.exitCode == 0) {
                        result = ExecutableManager.EOS_RESULT;
                    } else {
                        String str = "'" + ExecutableManager.this.command.toString() + "' failed with exit status: " + ExecutableManager.this.exitCode;
                        ExecutableManager.LOG.error(str);
                        result.result = str;
                        result.returnStatus = (byte) 2;
                    }
                } catch (InterruptedException e) {
                    try {
                        ExecutableManager.this.killProcess(ExecutableManager.this.process);
                    } catch (IOException e2) {
                        ExecutableManager.LOG.warn("Exception trying to kill process while processing null output from binary", e2);
                    }
                    String str2 = "Failure while waiting for process (" + ExecutableManager.this.command.toString() + ")" + e.getMessage();
                    ExecutableManager.LOG.error(str2, e);
                    result.result = str2;
                    result.returnStatus = (byte) 2;
                    ExecutableManager.this.sendOutput(this.binaryOutputQueue, result);
                    return;
                }
            }
            ExecutableManager.this.sendOutput(this.binaryOutputQueue, result);
        }
    }

    public void configure(POStream pOStream) throws IOException, ExecException {
        this.poStream = pOStream;
        this.command = pOStream.getCommand();
        this.inputHandler = HandlerFactory.createInputHandler(this.command);
        this.outputHandler = HandlerFactory.createOutputHandler(this.command);
    }

    public void close() throws IOException {
        this.inputHandler.close(this.process);
        if (this.inputHandler.getInputType() == InputHandler.InputType.ASYNCHRONOUS) {
            exec();
        }
        try {
            this.exitCode = this.process.waitFor();
        } catch (InterruptedException e) {
            LOG.error("Unexpected exception while waiting for streaming binary to complete", e);
            killProcess(this.process);
        }
        try {
            if (this.stdoutThread != null) {
                this.stdoutThread.join(0L);
            }
            this.stdoutThread = null;
        } catch (InterruptedException e2) {
            LOG.error("Unexpected exception while waiting for output thread for streaming binary to complete", e2);
            killProcess(this.process);
        }
        try {
            if (this.stderrThread != null) {
                this.stderrThread.join(0L);
            }
            this.stderrThread = null;
        } catch (InterruptedException e3) {
            LOG.error("Unexpected exception while waiting for input thread for streaming binary to complete", e3);
            killProcess(this.process);
        }
        LOG.debug("Process exited with: " + this.exitCode);
        if (this.exitCode != 0) {
            LOG.error(this.command + " failed with exit status: " + this.exitCode);
        }
        if (this.outputHandler.getOutputType() == OutputHandler.OutputType.ASYNCHRONOUS) {
            this.outputHandler.bindTo("", null, 0L, -1L);
            this.stdoutThread = new ProcessOutputThread(this.outputHandler, this.poStream);
            this.stdoutThread.start();
        }
        if (this.outerrThreadsError != null) {
            LOG.error("Output/Error thread failed with: " + this.outerrThreadsError);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void killProcess(Process process) throws IOException {
        if (process != null) {
            this.inputHandler.close(process);
            this.outputHandler.close();
            process.destroy();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void exec() throws IOException {
        this.process = StreamingUtil.createProcess(this.command).start();
        LOG.debug("Started the process for command: " + this.command);
        this.stderr = new DataInputStream(new BufferedInputStream(this.process.getErrorStream()));
        this.stderrThread = new ProcessErrorThread();
        this.stderrThread.start();
        if (this.outputHandler.getOutputType() == OutputHandler.OutputType.SYNCHRONOUS) {
            this.stdout = new DataInputStream(new BufferedInputStream(this.process.getInputStream()));
            this.outputHandler.bindTo("", new BufferedPositionedInputStream(this.stdout), 0L, Long.MAX_VALUE);
            this.stdoutThread = new ProcessOutputThread(this.outputHandler, this.poStream);
            this.stdoutThread.start();
        }
    }

    public void run() throws IOException {
        if (this.inputHandler.getInputType() == InputHandler.InputType.ASYNCHRONOUS) {
            this.fileInputThread = new ProcessInputThread(this.inputHandler, this.poStream, UDFContext.getUDFContext());
            this.fileInputThread.start();
            return;
        }
        exec();
        this.stdin = new DataOutputStream(new BufferedOutputStream(this.process.getOutputStream()));
        this.inputHandler.bindTo(this.stdin);
        this.stdinThread = new ProcessInputThread(this.inputHandler, this.poStream, null);
        this.stdinThread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendOutput(BlockingQueue<Result> blockingQueue, Result result) {
        try {
            blockingQueue.put(result);
        } catch (InterruptedException e) {
            LOG.error("Error while sending binary output to POStream", e);
        }
        synchronized (this.poStream) {
            if (result != null) {
                this.poStream.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processError(String str) {
        System.err.print(str);
    }
}
