package org.apache.kafka.clients;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/NetworkClientTest.class */
public class NetworkClientTest {
    private final int requestTimeoutMs = 1000;
    private MockTime time = new MockTime();
    private MockSelector selector = new MockSelector(this.time);
    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
    private int nodeId = 1;
    private Cluster cluster = TestUtils.singletonCluster("test", this.nodeId);
    private Node node = (Node) this.cluster.nodes().get(0);
    private long reconnectBackoffMsTest = 10000;
    private NetworkClient client = new NetworkClient(this.selector, this.metadata, "mock", Integer.MAX_VALUE, this.reconnectBackoffMsTest, 65536, 65536, 1000, this.time);
    private NetworkClient clientWithStaticNodes = new NetworkClient(this.selector, new ManualMetadataUpdater(Arrays.asList(this.node)), "mock-static", Integer.MAX_VALUE, 0, 65536, 65536, 1000, this.time);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/NetworkClientTest$TestCallbackHandler.class */
    public static class TestCallbackHandler implements RequestCompletionHandler {
        public boolean executed;
        public ClientResponse response;

        private TestCallbackHandler() {
            this.executed = false;
        }

        public void onComplete(ClientResponse clientResponse) {
            this.executed = true;
            this.response = clientResponse;
        }
    }

    @Before
    public void setup() {
        this.metadata.update(this.cluster, this.time.milliseconds());
    }

    @Test(expected = IllegalStateException.class)
    public void testSendToUnreadyNode() {
        this.client.send(new ClientRequest(this.time.milliseconds(), false, new RequestSend("5", this.client.nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(Arrays.asList("test")).toStruct()), (RequestCompletionHandler) null), this.time.milliseconds());
        this.client.poll(1L, this.time.milliseconds());
    }

    @Test
    public void testSimpleRequestResponse() {
        checkSimpleRequestResponse(this.client);
    }

    @Test
    public void testSimpleRequestResponseWithStaticNodes() {
        checkSimpleRequestResponse(this.clientWithStaticNodes);
    }

    @Test
    public void testClose() {
        this.client.ready(this.node, this.time.milliseconds());
        awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertTrue("The client should be ready", this.client.isReady(this.node, this.time.milliseconds()));
        this.client.send(new ClientRequest(this.time.milliseconds(), true, new RequestSend(this.node.idString(), this.client.nextRequestHeader(ApiKeys.PRODUCE), new ProduceRequest((short) 1, 1000, Collections.emptyMap()).toStruct()), (RequestCompletionHandler) null), this.time.milliseconds());
        Assert.assertEquals("There should be 1 in-flight request after send", 1L, this.client.inFlightRequestCount(this.node.idString()));
        this.client.close(this.node.idString());
        Assert.assertEquals("There should be no in-flight request after close", 0L, this.client.inFlightRequestCount(this.node.idString()));
        Assert.assertFalse("Connection should not be ready after close", this.client.isReady(this.node, 0L));
    }

    private void checkSimpleRequestResponse(NetworkClient networkClient) {
        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap());
        RequestHeader nextRequestHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE);
        RequestSend requestSend = new RequestSend(this.node.idString(), nextRequestHeader, produceRequest.toStruct());
        TestCallbackHandler testCallbackHandler = new TestCallbackHandler();
        ClientRequest clientRequest = new ClientRequest(this.time.milliseconds(), true, requestSend, testCallbackHandler);
        awaitReady(networkClient, this.node);
        networkClient.send(clientRequest, this.time.milliseconds());
        networkClient.poll(1L, this.time.milliseconds());
        Assert.assertEquals(1L, networkClient.inFlightRequestCount());
        ResponseHeader responseHeader = new ResponseHeader(nextRequestHeader.correlationId());
        Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
        struct.set("responses", new Object[0]);
        ByteBuffer allocate = ByteBuffer.allocate(responseHeader.sizeOf() + struct.sizeOf());
        responseHeader.writeTo(allocate);
        struct.writeTo(allocate);
        allocate.flip();
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), allocate));
        Assert.assertEquals(1L, networkClient.poll(1L, this.time.milliseconds()).size());
        Assert.assertTrue("The handler should have executed.", testCallbackHandler.executed);
        Assert.assertTrue("Should have a response body.", testCallbackHandler.response.hasResponse());
        Assert.assertEquals("Should be correlated to the original request", clientRequest, testCallbackHandler.response.request());
    }

    private void awaitReady(NetworkClient networkClient, Node node) {
        while (!networkClient.ready(node, this.time.milliseconds())) {
            networkClient.poll(1L, this.time.milliseconds());
        }
    }

    @Test
    public void testRequestTimeout() {
        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap());
        ClientRequest clientRequest = new ClientRequest(this.time.milliseconds(), true, new RequestSend(this.node.idString(), this.client.nextRequestHeader(ApiKeys.PRODUCE), produceRequest.toStruct()), new TestCallbackHandler());
        awaitReady(this.client, this.node);
        this.client.send(clientRequest, this.time.milliseconds());
        this.time.sleep(3000L);
        this.client.poll(3000L, this.time.milliseconds());
        Assert.assertEquals(this.node.idString(), this.selector.disconnected().get(0));
    }

    @Test
    public void testLeastLoadedNode() {
        this.client.ready(this.node, this.time.milliseconds());
        awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertTrue("The client should be ready", this.client.isReady(this.node, this.time.milliseconds()));
        Assert.assertEquals("There should be one leastloadednode", this.client.leastLoadedNode(this.time.milliseconds()).id(), this.node.id());
        this.time.sleep(this.reconnectBackoffMsTest);
        this.selector.close(this.node.idString());
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertFalse("After we forced the disconnection the client is no longer ready.", this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertEquals("There should be NO leastloadednode", this.client.leastLoadedNode(this.time.milliseconds()), (Object) null);
    }
}
