/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap;

import hive.com.google.common.base.Preconditions;
import hive.com.google.protobuf.MessageLite;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.ChannelOutputStream;
import org.apache.hadoop.hive.llap.LlapArrowRecordWriter;
import org.apache.hadoop.hive.llap.LlapOutputFormat;
import org.apache.hadoop.hive.llap.LlapRecordWriter;
import org.apache.hadoop.hive.llap.WritableByteChannelAdapter;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.io.ChunkedOutputStream;
import org.apache.hadoop.hive.llap.security.SecretManager;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapOutputFormatService {
    private static final Logger LOG = LoggerFactory.getLogger(LlapOutputFormat.class);
    private static final AtomicBoolean started = new AtomicBoolean(false);
    private static final AtomicBoolean initing = new AtomicBoolean(false);
    private static LlapOutputFormatService INSTANCE;
    private final Object lock = new Object();
    private final Map<String, RecordWriter<?, ?>> writers = new HashMap();
    private final Map<String, String> errors = new HashMap<String, String>();
    private final Configuration conf;
    private static final int WAIT_TIME = 5;
    private EventLoopGroup eventLoopGroup;
    private ServerBootstrap serverBootstrap;
    private ChannelFuture listeningChannelFuture;
    private int port;
    private final SecretManager sm;
    private final long writerTimeoutMs;

    private LlapOutputFormatService(Configuration conf, SecretManager sm) throws IOException {
        this.sm = sm;
        this.conf = conf;
        this.writerTimeoutMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_STREAM_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    public static void initializeAndStart(Configuration conf, SecretManager sm) throws Exception {
        if (!initing.getAndSet(true)) {
            INSTANCE = new LlapOutputFormatService(conf, sm);
            INSTANCE.start();
            started.set(true);
        }
    }

    public static LlapOutputFormatService get() throws IOException {
        Preconditions.checkState(started.get(), "LlapOutputFormatService must be started before invoking get");
        return INSTANCE;
    }

    public void start() throws IOException {
        LOG.info("Starting LlapOutputFormatService");
        int portFromConf = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
        int sendBufferSize = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE);
        this.eventLoopGroup = new NioEventLoopGroup();
        this.serverBootstrap = new ServerBootstrap();
        this.serverBootstrap.group(this.eventLoopGroup);
        this.serverBootstrap.channel(NioServerSocketChannel.class);
        this.serverBootstrap.childHandler((ChannelHandler)new LlapOutputFormatServiceChannelHandler(sendBufferSize));
        try {
            this.listeningChannelFuture = this.serverBootstrap.bind(portFromConf).sync();
            this.port = ((InetSocketAddress)this.listeningChannelFuture.channel().localAddress()).getPort();
            LOG.info("LlapOutputFormatService: Binding to port: {} with send buffer size: {} ", (Object)this.port, (Object)sendBufferSize);
        }
        catch (InterruptedException err) {
            throw new IOException("LlapOutputFormatService: Error binding to port " + portFromConf, err);
        }
    }

    public void stop() throws IOException, InterruptedException {
        LOG.info("Stopping LlapOutputFormatService");
        if (this.listeningChannelFuture != null) {
            this.listeningChannelFuture.channel().close().sync();
            this.listeningChannelFuture = null;
        } else {
            LOG.warn("LlapOutputFormatService does not appear to have a listening port to close.");
        }
        this.eventLoopGroup.shutdownGracefully(1L, 5L, TimeUnit.SECONDS).sync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException {
        RecordWriter<?, ?> writer = null;
        Object object = this.lock;
        synchronized (object) {
            long startTime = System.nanoTime();
            boolean isFirst = true;
            while ((writer = this.writers.get(id)) == null) {
                String error = this.errors.remove(id);
                if (error != null) {
                    throw new IOException(error);
                }
                if (isFirst) {
                    LOG.info("Waiting for writer for " + id);
                    isFirst = false;
                }
                if ((System.nanoTime() - startTime) / 1000000L > this.writerTimeoutMs) {
                    throw new IOException("The writer for " + id + " has timed out after " + this.writerTimeoutMs + "ms");
                }
                this.lock.wait(this.writerTimeoutMs);
            }
        }
        LOG.info("Returning writer for: " + id);
        return writer;
    }

    public int getPort() {
        return this.port;
    }

    protected class LlapOutputFormatServiceChannelHandler
    extends ChannelInitializer<SocketChannel> {
        private final int sendBufferSize;

        public LlapOutputFormatServiceChannelHandler(int sendBufferSize) {
            this.sendBufferSize = sendBufferSize;
        }

        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new ChannelHandler[]{new ProtobufVarint32FrameDecoder(), new ProtobufDecoder((MessageLite)LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.getDefaultInstance()), new StringEncoder(), new LlapOutputFormatServiceHandler(this.sendBufferSize)});
        }
    }

    protected class LlapOutputFormatChannelCloseListener
    implements ChannelFutureListener {
        private String id;

        LlapOutputFormatChannelCloseListener(String id) {
            this.id = id;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void operationComplete(ChannelFuture future) throws Exception {
            RecordWriter<?, ?> writer = null;
            LlapOutputFormatService llapOutputFormatService = INSTANCE;
            synchronized (llapOutputFormatService) {
                writer = LlapOutputFormatService.this.writers.remove(this.id);
            }
            if (writer == null) {
                LOG.warn("Did not find a writer for ID " + this.id);
            }
        }
    }

    protected class LlapOutputFormatServiceHandler
    extends SimpleChannelInboundHandler<LlapDaemonProtocolProtos.LlapOutputSocketInitMessage> {
        private final int sendBufferSize;

        public LlapOutputFormatServiceHandler(int sendBufferSize) {
            this.sendBufferSize = sendBufferSize;
        }

        public void channelRead0(ChannelHandlerContext ctx, LlapDaemonProtocolProtos.LlapOutputSocketInitMessage msg) {
            String id = msg.getFragmentId();
            byte[] tokenBytes = msg.hasToken() ? msg.getToken().toByteArray() : null;
            try {
                this.registerReader(ctx, id, tokenBytes);
            }
            catch (Throwable t) {
                this.failChannel(ctx, id, StringUtils.stringifyException((Throwable)t));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBytes) {
            if (LlapOutputFormatService.this.sm != null) {
                try {
                    LlapOutputFormatService.this.sm.verifyToken(tokenBytes);
                }
                catch (IOException | SecurityException ex) {
                    this.failChannel(ctx, id, ex.getMessage());
                    return;
                }
            }
            LOG.debug("registering socket for: " + id);
            int maxPendingWrites = HiveConf.getIntVar(LlapOutputFormatService.this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
            boolean useArrow = HiveConf.getBoolVar(LlapOutputFormatService.this.conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW);
            Object writer = null;
            writer = useArrow ? new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id)) : new LlapRecordWriter(id, new ChunkedOutputStream(new ChannelOutputStream(ctx, id, this.sendBufferSize, maxPendingWrites), this.sendBufferSize, id));
            boolean isFailed = true;
            Object object = LlapOutputFormatService.this.lock;
            synchronized (object) {
                if (!LlapOutputFormatService.this.writers.containsKey(id)) {
                    isFailed = false;
                    LlapOutputFormatService.this.writers.put(id, (RecordWriter<?, ?>)writer);
                    ctx.channel().closeFuture().addListener((GenericFutureListener)new LlapOutputFormatChannelCloseListener(id));
                    LlapOutputFormatService.this.lock.notifyAll();
                }
            }
            if (isFailed) {
                this.failChannel(ctx, id, "Writer already registered for " + id);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void failChannel(ChannelHandlerContext ctx, String id, String error) {
            ctx.close();
            Object object = LlapOutputFormatService.this.lock;
            synchronized (object) {
                LlapOutputFormatService.this.errors.put(id, error);
                LlapOutputFormatService.this.lock.notifyAll();
            }
            LOG.error(error);
        }
    }
}

