package org.apache.hadoop.hive.llap;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.llap.security.SecretManager;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/llap/TestLlapOutputFormat.class */
public class TestLlapOutputFormat {
    private static final Logger LOG = LoggerFactory.getLogger(TestLlapOutputFormat.class);
    private static LlapOutputFormatService service;

    @BeforeClass
    public static void setUp() throws Exception {
        LOG.debug("Setting up output service");
        Configuration configuration = new Configuration();
        HiveConf.setIntVar(configuration, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, 0);
        LlapOutputFormatService.initializeAndStart(configuration, (SecretManager) null);
        service = LlapOutputFormatService.get();
        LlapProxy.setDaemon(true);
        LOG.debug("Output service up");
    }

    @AfterClass
    public static void tearDown() throws IOException, InterruptedException {
        LOG.debug("Tearing down service");
        service.stop();
        LOG.debug("Tearing down complete");
    }

    @Test
    public void testValues() throws Exception {
        JobConf jobConf = new JobConf();
        for (int i = 0; i < 5; i++) {
            String str = "foobar" + i;
            jobConf.set("llap.of.id", str);
            LlapOutputFormat llapOutputFormat = new LlapOutputFormat();
            new HiveConf();
            Socket socket = new Socket("localhost", service.getPort());
            LOG.debug("Socket connected");
            OutputStream outputStream = socket.getOutputStream();
            LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder().setFragmentId(str).build().writeDelimitedTo(outputStream);
            outputStream.flush();
            Thread.sleep(3000L);
            LOG.debug("Data written");
            RecordWriter recordWriter = llapOutputFormat.getRecordWriter((FileSystem) null, jobConf, (String) null, (Progressable) null);
            Text text = new Text();
            LOG.debug("Have record writer");
            for (int i2 = 0; i2 < 10; i2++) {
                text.set(i2);
                recordWriter.write(NullWritable.get(), text);
            }
            recordWriter.close((Reporter) null);
            LlapBaseRecordReader llapBaseRecordReader = new LlapBaseRecordReader(socket.getInputStream(), (Schema) null, Text.class, jobConf, (Closeable) null, (Closeable) null);
            LOG.debug("Have record reader");
            llapBaseRecordReader.handleEvent(LlapBaseRecordReader.ReaderEvent.doneEvent());
            int i3 = 0;
            while (llapBaseRecordReader.next(NullWritable.get(), text)) {
                LOG.debug(text.toString());
                i3++;
            }
            llapBaseRecordReader.close();
            Assert.assertEquals(10L, i3);
        }
    }

    @Test
    public void testBadClientMessage() throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.set("llap.of.id", "foobar");
        LlapOutputFormat llapOutputFormat = new LlapOutputFormat();
        Socket socket = new Socket("localhost", service.getPort());
        LOG.debug("Socket connected");
        OutputStream outputStream = socket.getOutputStream();
        LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder().setFragmentId("foobar").build().writeDelimitedTo(outputStream);
        LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder().setFragmentId("foobar").build().writeDelimitedTo(outputStream);
        outputStream.flush();
        Thread.sleep(3000L);
        LOG.debug("Data written");
        try {
            llapOutputFormat.getRecordWriter((FileSystem) null, jobConf, (String) null, (Progressable) null);
            Assert.fail("Didn't throw");
        } catch (IOException e) {
        }
    }
}
