package kafka.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.Properties;
import java.util.Random;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManager;
import kafka.api.ProducerRequest;
import kafka.network.RequestChannel;
import kafka.producer.SyncProducerConfig$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SystemTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: SocketServerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005d\u0001B\u0001\u0003\u0001\u001d\u0011\u0001cU8dW\u0016$8+\u001a:wKJ$Vm\u001d;\u000b\u0005\r!\u0011a\u00028fi^|'o\u001b\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!A\u0003&V]&$8+^5uK\")1\u0003\u0001C\u0001)\u00051A(\u001b8jiz\"\u0012!\u0006\t\u0003-\u0001i\u0011A\u0001\u0005\b1\u0001\u0011\r\u0011\"\u0001\u001a\u0003\u0015\u0001(o\u001c9t+\u0005Q\u0002CA\u000e!\u001b\u0005a\"BA\u000f\u001f\u0003\u0011)H/\u001b7\u000b\u0003}\tAA[1wC&\u0011\u0011\u0005\b\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\bBB\u0012\u0001A\u0003%!$\u0001\u0004qe>\u00048\u000f\t\u0005\bK\u0001\u0011\r\u0011\"\u0001'\u0003\u0019\u0019wN\u001c4jOV\tq\u0005\u0005\u0002)W5\t\u0011F\u0003\u0002+\t\u000511/\u001a:wKJL!\u0001L\u0015\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\u0007]\u0001\u0001\u000b\u0011B\u0014\u0002\u000f\r|gNZ5hA!9\u0001\u0007\u0001b\u0001\n\u0003\t\u0014aB7fiJL7m]\u000b\u0002eA\u00111GO\u0007\u0002i)\u0011\u0001'\u000e\u0006\u0003m]\naaY8n[>t'BA\u00039\u0015\tId\"\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0003wQ\u0012q!T3ue&\u001c7\u000f\u0003\u0004>\u0001\u0001\u0006IAM\u0001\t[\u0016$(/[2tA!9!\u0006\u0001b\u0001\n\u0003yT#\u0001!\u0011\u0005Y\t\u0015B\u0001\"\u0003\u00051\u0019vnY6fiN+'O^3s\u0011\u0019!\u0005\u0001)A\u0005\u0001\u000691/\u001a:wKJ\u0004\u0003\"\u0002$\u0001\t\u00039\u0015aC:f]\u0012\u0014V-];fgR$B\u0001\u0013(W7B\u0011\u0011\nT\u0007\u0002\u0015*\t1*A\u0003tG\u0006d\u0017-\u0003\u0002N\u0015\n!QK\\5u\u0011\u0015yU\t1\u0001Q\u0003\u0019\u0019xnY6fiB\u0011\u0011\u000bV\u0007\u0002%*\u00111KH\u0001\u0004]\u0016$\u0018BA+S\u0005\u0019\u0019vnY6fi\")q+\u0012a\u00011\u0006\u0011\u0011\u000e\u001a\t\u0003\u0013fK!A\u0017&\u0003\u000bMCwN\u001d;\t\u000bq+\u0005\u0019A/\u0002\u000fI,\u0017/^3tiB\u0019\u0011J\u00181\n\u0005}S%!B!se\u0006L\bCA%b\u0013\t\u0011'J\u0001\u0003CsR,\u0007\"\u00023\u0001\t\u0003)\u0017a\u0004:fG\u0016Lg/\u001a*fgB|gn]3\u0015\u0005u3\u0007\"B(d\u0001\u0004\u0001\u0006\"\u00025\u0001\t\u0003I\u0017A\u00049s_\u000e,7o\u001d*fcV,7\u000f\u001e\u000b\u0003\u0011*DQa[4A\u00021\fqa\u00195b]:,G\u000e\u0005\u0002\u0017[&\u0011aN\u0001\u0002\u000f%\u0016\fX/Z:u\u0007\"\fgN\\3m\u0011\u0015\u0001\b\u0001\"\u0001r\u0003\u001d\u0019wN\u001c8fGR$2\u0001\u0015:u\u0011\u001d\u0019x\u000e%AA\u0002\u0001\u000b\u0011a\u001d\u0005\bk>\u0004\n\u00111\u0001w\u0003!\u0001(o\u001c;pG>d\u0007CA<z\u001b\u0005A(BA;6\u0013\tQ\bP\u0001\tTK\u000e,(/\u001b;z!J|Go\\2pY\")A\u0010\u0001C\u0001{\u000691\r\\3b]V\u0004H#\u0001%)\u0005m|\b\u0003BA\u0001\u0003\u000bi!!a\u0001\u000b\u0005-q\u0011\u0002BA\u0004\u0003\u0007\u0011Q!\u00114uKJDq!a\u0003\u0001\t\u0013\ti!\u0001\u000bqe>$WoY3s%\u0016\fX/Z:u\u0005f$Xm]\u000b\u0002;\"1\u0011\u0011\u0003\u0001\u0005\u0002u\fQb]5na2,'+Z9vKN$\b\u0006BA\b\u0003+\u0001B!!\u0001\u0002\u0018%!\u0011\u0011DA\u0002\u0005\u0011!Vm\u001d;\t\r\u0005u\u0001\u0001\"\u0001~\u0003]!xn\u001c\"jOJ+\u0017/^3ti&\u001b(+\u001a6fGR,G\r\u000b\u0003\u0002\u001c\u0005U\u0001BBA\u0012\u0001\u0011\u0005Q0\u0001\u000euKN$8k\\2lKR\u001c8\t\\8tK>s7\u000b[;uI><h\u000e\u000b\u0003\u0002\"\u0005U\u0001BBA\u0015\u0001\u0011\u0005Q0A\fuKN$X*\u0019=D_:tWm\u0019;j_:\u001c\b+\u001a:Ja\"\"\u0011qEA\u000b\u0011\u0019\ty\u0003\u0001C\u0001{\u0006\u0001C/Z:u\u001b\u0006D8i\u001c8oK\u000e$\u0018n\u001c8t!\u0016\u0014\u0018\nU(wKJ\u0014\u0018\u000eZ3tQ\u0011\ti#!\u0006\t\r\u0005U\u0002\u0001\"\u0001~\u0003M!Xm\u001d;Tg2\u001cvnY6fiN+'O^3sQ\u0011\t\u0019$!\u0006\t\r\u0005m\u0002\u0001\"\u0001~\u0003Q!Xm\u001d;TKN\u001c\u0018n\u001c8Qe&t7-\u001b9bY\"\"\u0011\u0011HA\u000b\u0011%\t\t\u0005AI\u0001\n\u0003\t\u0019%A\td_:tWm\u0019;%I\u00164\u0017-\u001e7uIE*\"!!\u0012+\u0007\u0001\u000b9e\u000b\u0002\u0002JA!\u00111JA+\u001b\t\tiE\u0003\u0003\u0002P\u0005E\u0013!C;oG\",7m[3e\u0015\r\t\u0019FS\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA,\u0003\u001b\u0012\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0011%\tY\u0006AI\u0001\n\u0003\ti&A\td_:tWm\u0019;%I\u00164\u0017-\u001e7uII*\"!a\u0018+\u0007Y\f9\u0005")
/* loaded from: input_file:kafka/network/SocketServerTest.class */
public class SocketServerTest extends JUnitSuite {
    private final Properties props = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14());
    private final KafkaConfig config;
    private final Metrics metrics;
    private final SocketServer server;

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    public Metrics metrics() {
        return this.metrics;
    }

    public SocketServer server() {
        return this.server;
    }

    public void sendRequest(Socket socket, short s, byte[] bArr) {
        DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
        dataOutputStream.writeInt(bArr.length + 2);
        dataOutputStream.writeShort(s);
        dataOutputStream.write(bArr);
        dataOutputStream.flush();
    }

    public byte[] receiveResponse(Socket socket) {
        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        byte[] bArr = new byte[dataInputStream.readInt()];
        dataInputStream.readFully(bArr);
        return bArr;
    }

    public void processRequest(RequestChannel requestChannel) {
        RequestChannel.Request receiveRequest = requestChannel.receiveRequest();
        ByteBuffer allocate = ByteBuffer.allocate(receiveRequest.requestObj().sizeInBytes());
        receiveRequest.requestObj().writeTo(allocate);
        allocate.rewind();
        requestChannel.sendResponse(new RequestChannel.Response(receiveRequest.processor(), receiveRequest, new NetworkSend(receiveRequest.connectionId(), new ByteBuffer[]{allocate})));
    }

    public Socket connect(SocketServer socketServer, SecurityProtocol securityProtocol) {
        return new Socket("localhost", server().boundPort(securityProtocol));
    }

    public SocketServer connect$default$1() {
        return server();
    }

    public SecurityProtocol connect$default$2() {
        return SecurityProtocol.PLAINTEXT;
    }

    @After
    public void cleanup() {
        metrics().close();
        server().shutdown();
    }

    private byte[] producerRequestBytes() {
        ProducerRequest producerRequest = new ProducerRequest(-1, SyncProducerConfig$.MODULE$.DefaultClientId(), SyncProducerConfig$.MODULE$.DefaultRequiredAcks(), SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), Map$.MODULE$.apply(Nil$.MODULE$));
        ByteBuffer allocate = ByteBuffer.allocate(producerRequest.sizeInBytes());
        producerRequest.writeTo(allocate);
        allocate.rewind();
        byte[] bArr = new byte[allocate.remaining()];
        allocate.get(bArr);
        return bArr;
    }

    @Test
    public void simpleRequest() {
        Socket connect = connect(connect$default$1(), SecurityProtocol.PLAINTEXT);
        Socket connect2 = connect(connect$default$1(), SecurityProtocol.TRACE);
        byte[] producerRequestBytes = producerRequestBytes();
        sendRequest(connect, (short) 0, producerRequestBytes);
        processRequest(server().requestChannel());
        Assert.assertEquals(Predef$.MODULE$.byteArrayOps(producerRequestBytes).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(connect)).toSeq());
        sendRequest(connect2, (short) 0, producerRequestBytes);
        processRequest(server().requestChannel());
        Assert.assertEquals(Predef$.MODULE$.byteArrayOps(producerRequestBytes).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(connect2)).toSeq());
    }

    @Test
    public void tooBigRequestIsRejected() {
        byte[] bArr = new byte[Predef$.MODULE$.Integer2int(server().config().socketRequestMaxBytes()) + 1];
        new Random().nextBytes(bArr);
        Socket connect = connect(connect$default$1(), connect$default$2());
        sendRequest(connect, (short) 0, bArr);
        try {
            receiveResponse(connect);
        } catch (IOException e) {
        }
    }

    @Test
    public void testSocketsCloseOnShutdown() {
        Socket connect = connect(connect$default$1(), SecurityProtocol.PLAINTEXT);
        Socket connect2 = connect(connect$default$1(), SecurityProtocol.TRACE);
        byte[] bArr = new byte[40];
        sendRequest(connect, (short) 0, bArr);
        sendRequest(connect2, (short) 0, bArr);
        processRequest(server().requestChannel());
        server().acceptors().values().map(new SocketServerTest$$anonfun$testSocketsCloseOnShutdown$1(this), Iterable$.MODULE$.canBuildFrom());
        server().shutdown();
        byte[] bArr2 = new byte[1000000];
        try {
            sendRequest(connect, (short) 0, bArr2);
            throw fail("expected exception when writing to closed plain socket");
        } catch (IOException e) {
            try {
                sendRequest(connect2, (short) 0, bArr2);
                throw fail("expected exception when writing to closed trace socket");
            } catch (IOException e2) {
            }
        }
    }

    @Test
    public void testMaxConnectionsPerIp() {
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), Predef$.MODULE$.Integer2int(server().config().maxConnectionsPerIp())).map(new SocketServerTest$$anonfun$1(this), IndexedSeq$.MODULE$.canBuildFrom());
        Socket connect = connect(connect$default$1(), connect$default$2());
        connect.setSoTimeout(3000);
        Assert.assertEquals(-1L, connect.getInputStream().read());
        connect.close();
        InetAddress inetAddress = ((Socket) indexedSeq.head()).getInetAddress();
        ((Socket) indexedSeq.head()).close();
        TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testMaxConnectionsPerIp$1(this, indexedSeq, inetAddress), "Failed to decrement connection count after close", TestUtils$.MODULE$.waitUntilTrue$default$3());
        Socket connect2 = connect(connect$default$1(), connect$default$2());
        sendRequest(connect2, (short) 0, producerRequestBytes());
        Assert.assertNotNull(server().requestChannel().receiveRequest(2000L));
        connect2.close();
        ((IterableLike) indexedSeq.tail()).foreach(new SocketServerTest$$anonfun$testMaxConnectionsPerIp$2(this));
    }

    @Test
    public void testMaxConnectionsPerIPOverrides() {
        scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("localhost"), BoxesRunTime.boxToInteger(6))}));
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14());
        Metrics metrics = new Metrics();
        SocketServer socketServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), metrics, new SystemTime());
        try {
            socketServer.startup();
            IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 6).map(new SocketServerTest$$anonfun$2(this, socketServer), IndexedSeq$.MODULE$.canBuildFrom());
            Socket connect = connect(socketServer, connect$default$2());
            connect.setSoTimeout(3000);
            Assert.assertEquals(-1L, connect.getInputStream().read());
            connect.close();
            indexedSeq.foreach(new SocketServerTest$$anonfun$testMaxConnectionsPerIPOverrides$1(this));
        } finally {
            socketServer.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testSslSocketServer() {
        File createTempFile = File.createTempFile("truststore", ".jks");
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), new Some<>(createTempFile), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), true, TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14());
        createBrokerConfig.put("listeners", "SSL://localhost:0");
        Metrics metrics = new Metrics();
        SocketServer socketServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), metrics, new SystemTime());
        socketServer.startup();
        try {
            SSLContext sSLContext = SSLContext.getInstance("TLSv1.2");
            sSLContext.init(null, new TrustManager[]{TestUtils$.MODULE$.trustAllCerts()}, new SecureRandom());
            SSLSocket sSLSocket = (SSLSocket) sSLContext.getSocketFactory().createSocket("localhost", socketServer.boundPort(SecurityProtocol.SSL));
            sSLSocket.setNeedClientAuth(false);
            ProducerRequest producerRequest = new ProducerRequest(-1, SyncProducerConfig$.MODULE$.DefaultClientId(), SyncProducerConfig$.MODULE$.DefaultRequiredAcks(), SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), Map$.MODULE$.apply(Nil$.MODULE$));
            ByteBuffer allocate = ByteBuffer.allocate(producerRequest.sizeInBytes());
            producerRequest.writeTo(allocate);
            allocate.rewind();
            byte[] bArr = new byte[allocate.remaining()];
            allocate.get(bArr);
            sendRequest(sSLSocket, (short) 0, bArr);
            processRequest(socketServer.requestChannel());
            Assert.assertEquals(Predef$.MODULE$.byteArrayOps(bArr).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(sSLSocket)).toSeq());
            sSLSocket.close();
        } finally {
            socketServer.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testSessionPrincipal() {
        Socket connect = connect(connect$default$1(), connect$default$2());
        sendRequest(connect, (short) 0, new byte[40]);
        Assert.assertEquals(KafkaPrincipal.ANONYMOUS, server().requestChannel().receiveRequest().session().principal());
        connect.close();
    }

    public SocketServerTest() {
        props().put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0");
        props().put("num.network.threads", "1");
        props().put("socket.send.buffer.bytes", "300000");
        props().put("socket.receive.buffer.bytes", "300000");
        props().put("queued.max.requests", "50");
        props().put("socket.request.max.bytes", "50");
        props().put("max.connections.per.ip", "5");
        props().put("connections.max.idle.ms", "60000");
        this.config = KafkaConfig$.MODULE$.fromProps(props());
        this.metrics = new Metrics();
        this.server = new SocketServer(config(), metrics(), new SystemTime());
        server().startup();
    }
}
