package org.apache.nifi.distributed.cache.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
import org.apache.nifi.remote.VersionNegotiator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.class */
public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter {
    private static final int PROTOCOL_UNINITIALIZED = 0;
    private static final byte[] MAGIC_HEADER = {78, 105, 70, 105};
    private final ChannelPromise promiseHandshakeComplete;
    private final VersionNegotiator versionNegotiator;
    private final long timeoutMillis;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicInteger protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED);

    public CacheClientHandshakeHandler(Channel channel, VersionNegotiator versionNegotiator, long j) {
        this.promiseHandshakeComplete = channel.newPromise();
        this.versionNegotiator = versionNegotiator;
        this.timeoutMillis = j;
    }

    public void waitHandshakeComplete() {
        this.promiseHandshakeComplete.awaitUninterruptibly(this.timeoutMillis, TimeUnit.MILLISECONDS);
        if (this.promiseHandshakeComplete.isSuccess()) {
            return;
        }
        this.promiseHandshakeComplete.setFailure(new HandshakeException("Handshake timed out before completion."));
    }

    public VersionNegotiator getVersionNegotiator() {
        return this.versionNegotiator;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws IOException {
        channelHandlerContext.write(Unpooled.wrappedBuffer(MAGIC_HEADER));
        this.logger.debug("Magic header written");
        channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(new OutboundAdapter().write(this.versionNegotiator.getVersion()).toBytes()));
        this.logger.debug("Protocol version {} proposed", Integer.valueOf(this.versionNegotiator.getVersion()));
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (this.promiseHandshakeComplete.isSuccess()) {
            channelHandlerContext.fireChannelRead(obj);
            return;
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        try {
            try {
                processHandshake(channelHandlerContext, byteBuf);
                byteBuf.release();
            } catch (IOException | HandshakeException e) {
                throw new IllegalStateException("Handshake Processing Failed", e);
            }
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    private void processHandshake(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws HandshakeException, IOException {
        short readUnsignedByte = byteBuf.readUnsignedByte();
        if (readUnsignedByte == 20) {
            this.logger.debug("Protocol version {} accepted", Integer.valueOf(this.versionNegotiator.getVersion()));
            this.protocol.set(this.versionNegotiator.getVersion());
            return;
        }
        if (readUnsignedByte != 21) {
            if (readUnsignedByte != 255) {
                throw new HandshakeException("Unknown handshake signal: " + readUnsignedByte);
            }
            byte[] bArr = new byte[byteBuf.readShort()];
            byteBuf.readBytes(bArr);
            throw new HandshakeException("Remote destination aborted connection with message: " + new String(bArr, StandardCharsets.UTF_8));
        }
        int readInt = byteBuf.readInt();
        this.logger.debug("Received protocol version {} counter proposal", Integer.valueOf(readInt));
        Integer preferredVersion = this.versionNegotiator.getPreferredVersion(readInt);
        Optional.ofNullable(preferredVersion).orElseThrow(() -> {
            return new HandshakeException(String.format("Received unsupported protocol version proposal [%d]", Integer.valueOf(readInt)));
        });
        this.versionNegotiator.setVersion(preferredVersion.intValue());
        channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(new OutboundAdapter().write(preferredVersion.intValue()).toBytes()));
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        if (this.promiseHandshakeComplete.isSuccess()) {
            channelHandlerContext.fireChannelReadComplete();
        } else if (this.protocol.get() > 0) {
            this.promiseHandshakeComplete.setSuccess();
        }
    }

    public boolean isSuccess() {
        return this.promiseHandshakeComplete.isSuccess();
    }

    public Throwable cause() {
        return this.promiseHandshakeComplete.cause();
    }
}
