/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.event.transport.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.EventSenderFactory;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.EventLoopGroupFactory;
import org.apache.nifi.event.transport.netty.NettyEventSender;
import org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
import org.apache.nifi.event.transport.netty.channel.ssl.ClientSslStandardChannelInitializer;

public class NettyEventSenderFactory<T>
extends EventLoopGroupFactory
implements EventSenderFactory<T> {
    private static final int MAX_PENDING_ACQUIRES = 1024;
    private Integer socketSendBufferSize = null;
    private final String address;
    private final int port;
    private final TransportProtocol protocol;
    private Duration timeout = Duration.ofSeconds(30L);
    private Duration idleTimeout = Duration.ofSeconds(0L);
    private int maxConnections = Runtime.getRuntime().availableProcessors() * 2;
    private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList();
    private SSLContext sslContext;
    private boolean singleEventPerConnection = false;
    private Duration shutdownQuietPeriod = ShutdownQuietPeriod.DEFAULT.getDuration();
    private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration();

    public NettyEventSenderFactory(String address, int port, TransportProtocol protocol) {
        this.address = address;
        this.port = port;
        this.protocol = protocol;
    }

    public void setSocketSendBufferSize(Integer socketSendBufferSize) {
        this.socketSendBufferSize = socketSendBufferSize;
    }

    public void setHandlerSupplier(Supplier<List<ChannelHandler>> handlerSupplier) {
        this.handlerSupplier = Objects.requireNonNull(handlerSupplier);
    }

    public void setSslContext(SSLContext sslContext) {
        this.sslContext = sslContext;
    }

    public void setTimeout(Duration timeout) {
        this.timeout = Objects.requireNonNull(timeout, "Timeout required");
    }

    public void setIdleTimeout(Duration idleTimeout) {
        this.idleTimeout = Objects.requireNonNull(idleTimeout, "Timeout required");
    }

    public void setShutdownQuietPeriod(Duration quietPeriod) {
        this.shutdownQuietPeriod = quietPeriod;
    }

    public void setShutdownTimeout(Duration timeout) {
        this.shutdownTimeout = timeout;
    }

    public void setMaxConnections(int maxConnections) {
        this.maxConnections = maxConnections;
    }

    public void setSingleEventPerConnection(boolean singleEventPerConnection) {
        this.singleEventPerConnection = singleEventPerConnection;
    }

    @Override
    public EventSender<T> getEventSender() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.remoteAddress((SocketAddress)new InetSocketAddress(this.address, this.port));
        EventLoopGroup group = this.getEventLoopGroup();
        bootstrap.group(group);
        if (TransportProtocol.UDP.equals((Object)this.protocol)) {
            bootstrap.channel(NioDatagramChannel.class);
        } else {
            bootstrap.channel(NioSocketChannel.class);
        }
        this.setChannelOptions(bootstrap);
        return this.getConfiguredEventSender(bootstrap);
    }

    private void setChannelOptions(Bootstrap bootstrap) {
        int timeoutMilliseconds = (int)this.timeout.toMillis();
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)timeoutMilliseconds);
        if (this.socketSendBufferSize != null) {
            bootstrap.option(ChannelOption.SO_SNDBUF, (Object)this.socketSendBufferSize);
        }
    }

    private EventSender<T> getConfiguredEventSender(Bootstrap bootstrap) {
        SocketAddress remoteAddress = bootstrap.config().remoteAddress();
        ChannelPool channelPool = this.getChannelPool(bootstrap);
        return new NettyEventSender(bootstrap.config().group(), channelPool, remoteAddress, this.singleEventPerConnection, this.shutdownQuietPeriod, this.shutdownTimeout);
    }

    private ChannelPool getChannelPool(Bootstrap bootstrap) {
        ChannelInitializer<Channel> channelInitializer = this.getChannelInitializer();
        InitializingChannelPoolHandler handler = new InitializingChannelPoolHandler(channelInitializer);
        return new FixedChannelPool(bootstrap, (ChannelPoolHandler)handler, ChannelHealthChecker.ACTIVE, FixedChannelPool.AcquireTimeoutAction.FAIL, this.timeout.toMillis(), this.maxConnections, 1024);
    }

    private ChannelInitializer<Channel> getChannelInitializer() {
        StandardChannelInitializer<Channel> channelInitializer = this.sslContext == null ? new StandardChannelInitializer<Channel>(this.handlerSupplier) : new ClientSslStandardChannelInitializer(this.handlerSupplier, this.sslContext);
        channelInitializer.setWriteTimeout(this.timeout);
        channelInitializer.setIdleTimeout(this.idleTimeout);
        return channelInitializer;
    }
}

