package org.apache.kafka.server.util;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/server/util/InterBrokerSendThreadTest.class */
public class InterBrokerSendThreadTest {
    private final Time time = new MockTime();
    private final KafkaClient networkClient = (KafkaClient) Mockito.mock(KafkaClient.class);
    private final StubCompletionHandler completionHandler = new StubCompletionHandler();
    private final int requestTimeoutMs = 1000;

    /* loaded from: input_file:org/apache/kafka/server/util/InterBrokerSendThreadTest$StubCompletionHandler.class */
    private static class StubCompletionHandler implements RequestCompletionHandler {
        public boolean executedWithDisconnectedResponse;
        ClientResponse response;

        private StubCompletionHandler() {
            this.executedWithDisconnectedResponse = false;
            this.response = null;
        }

        public void onComplete(ClientResponse clientResponse) {
            this.executedWithDisconnectedResponse = clientResponse.wasDisconnected();
            this.response = clientResponse;
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/util/InterBrokerSendThreadTest$StubRequestBuilder.class */
    private static class StubRequestBuilder<T extends AbstractRequest> extends AbstractRequest.Builder<T> {
        private StubRequestBuilder() {
            super(ApiKeys.END_TXN);
        }

        public T build(short s) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/util/InterBrokerSendThreadTest$TestInterBrokerSendThread.class */
    class TestInterBrokerSendThread extends InterBrokerSendThread {
        private final Consumer<Throwable> exceptionCallback;
        private final Queue<RequestAndCompletionHandler> queue;

        TestInterBrokerSendThread(InterBrokerSendThreadTest interBrokerSendThreadTest) {
            this(interBrokerSendThreadTest.networkClient, th -> {
                if (!(th instanceof RuntimeException)) {
                    throw new RuntimeException(th);
                }
            });
        }

        TestInterBrokerSendThread(KafkaClient kafkaClient, Consumer<Throwable> consumer) {
            super("name", kafkaClient, 1000, InterBrokerSendThreadTest.this.time);
            this.queue = new ArrayDeque();
            this.exceptionCallback = consumer;
        }

        void enqueue(RequestAndCompletionHandler requestAndCompletionHandler) {
            this.queue.offer(requestAndCompletionHandler);
        }

        public Collection<RequestAndCompletionHandler> generateRequests() {
            return this.queue.isEmpty() ? Collections.emptyList() : Collections.singletonList(this.queue.poll());
        }

        protected void pollOnce(long j) {
            try {
                super.pollOnce(j);
            } catch (Throwable th) {
                this.exceptionCallback.accept(th);
            }
        }
    }

    @Test
    public void testShutdownThreadShouldNotCauseException() throws InterruptedException, IOException {
        Mockito.when(this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenThrow(new Throwable[]{new DisconnectException()});
        Mockito.when(Boolean.valueOf(this.networkClient.active())).thenReturn(false);
        AtomicReference atomicReference = new AtomicReference();
        KafkaClient kafkaClient = this.networkClient;
        Objects.requireNonNull(atomicReference);
        TestInterBrokerSendThread testInterBrokerSendThread = new TestInterBrokerSendThread(kafkaClient, (v1) -> {
            r4.getAndSet(v1);
        });
        testInterBrokerSendThread.shutdown();
        testInterBrokerSendThread.pollOnce(100L);
        ((KafkaClient) Mockito.verify(this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).initiateClose();
        ((KafkaClient) Mockito.verify(this.networkClient)).close();
        ((KafkaClient) Mockito.verify(this.networkClient)).active();
        Mockito.verifyNoMoreInteractions(new Object[]{this.networkClient});
        Assertions.assertNull(atomicReference.get());
    }

    @Test
    public void testDisconnectWithoutShutdownShouldCauseException() {
        Mockito.when(this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenThrow(new Throwable[]{new DisconnectException()});
        Mockito.when(Boolean.valueOf(this.networkClient.active())).thenReturn(true);
        AtomicReference atomicReference = new AtomicReference();
        KafkaClient kafkaClient = this.networkClient;
        Objects.requireNonNull(atomicReference);
        new TestInterBrokerSendThread(kafkaClient, (v1) -> {
            r4.getAndSet(v1);
        }).pollOnce(100L);
        ((KafkaClient) Mockito.verify(this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).active();
        Mockito.verifyNoMoreInteractions(new Object[]{this.networkClient});
        Throwable th = (Throwable) atomicReference.get();
        Assertions.assertNotNull(th);
        Assertions.assertTrue(th instanceof FatalExitError);
    }

    @Test
    public void testShouldNotSendAnythingWhenNoRequests() {
        TestInterBrokerSendThread testInterBrokerSendThread = new TestInterBrokerSendThread(this);
        Mockito.when(this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(Collections.emptyList());
        testInterBrokerSendThread.doWork();
        ((KafkaClient) Mockito.verify(this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Mockito.verifyNoMoreInteractions(new Object[]{this.networkClient});
        Assertions.assertFalse(this.completionHandler.executedWithDisconnectedResponse);
    }

    @Test
    public void testShouldCreateClientRequestAndSendWhenNodeIsReady() {
        StubRequestBuilder stubRequestBuilder = new StubRequestBuilder();
        Node node = new Node(1, "", 8080);
        RequestAndCompletionHandler requestAndCompletionHandler = new RequestAndCompletionHandler(this.time.milliseconds(), node, stubRequestBuilder, this.completionHandler);
        TestInterBrokerSendThread testInterBrokerSendThread = new TestInterBrokerSendThread(this);
        ClientRequest clientRequest = new ClientRequest("dest", stubRequestBuilder, 0, "1", 0L, true, 1000, requestAndCompletionHandler.handler);
        Mockito.when(this.networkClient.newClientRequest((String) ArgumentMatchers.eq("1"), (AbstractRequest.Builder) ArgumentMatchers.same(requestAndCompletionHandler.request), ArgumentMatchers.anyLong(), ArgumentMatchers.eq(true), ArgumentMatchers.eq(1000), (RequestCompletionHandler) ArgumentMatchers.same(requestAndCompletionHandler.handler))).thenReturn(clientRequest);
        Mockito.when(Boolean.valueOf(this.networkClient.ready(node, this.time.milliseconds()))).thenReturn(true);
        Mockito.when(this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(Collections.emptyList());
        testInterBrokerSendThread.enqueue(requestAndCompletionHandler);
        testInterBrokerSendThread.doWork();
        ((KafkaClient) Mockito.verify(this.networkClient)).newClientRequest((String) ArgumentMatchers.eq("1"), (AbstractRequest.Builder) ArgumentMatchers.same(requestAndCompletionHandler.request), ArgumentMatchers.anyLong(), ArgumentMatchers.eq(true), ArgumentMatchers.eq(1000), (RequestCompletionHandler) ArgumentMatchers.same(requestAndCompletionHandler.handler));
        ((KafkaClient) Mockito.verify(this.networkClient)).ready((Node) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).send((ClientRequest) ArgumentMatchers.same(clientRequest), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Mockito.verifyNoMoreInteractions(new Object[]{this.networkClient});
        Assertions.assertFalse(this.completionHandler.executedWithDisconnectedResponse);
    }

    @Test
    public void testShouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady() {
        StubRequestBuilder stubRequestBuilder = new StubRequestBuilder();
        Node node = new Node(1, "", 8080);
        RequestAndCompletionHandler requestAndCompletionHandler = new RequestAndCompletionHandler(this.time.milliseconds(), node, stubRequestBuilder, this.completionHandler);
        TestInterBrokerSendThread testInterBrokerSendThread = new TestInterBrokerSendThread(this);
        Mockito.when(this.networkClient.newClientRequest((String) ArgumentMatchers.eq("1"), (AbstractRequest.Builder) ArgumentMatchers.same(requestAndCompletionHandler.request), ArgumentMatchers.anyLong(), ArgumentMatchers.eq(true), ArgumentMatchers.eq(1000), (RequestCompletionHandler) ArgumentMatchers.same(requestAndCompletionHandler.handler))).thenReturn(new ClientRequest("dest", stubRequestBuilder, 0, "1", 0L, true, 1000, requestAndCompletionHandler.handler));
        Mockito.when(Boolean.valueOf(this.networkClient.ready(node, this.time.milliseconds()))).thenReturn(false);
        Mockito.when(Long.valueOf(this.networkClient.connectionDelay((Node) ArgumentMatchers.any(), ArgumentMatchers.anyLong()))).thenReturn(0L);
        Mockito.when(this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(Collections.emptyList());
        Mockito.when(Boolean.valueOf(this.networkClient.connectionFailed(node))).thenReturn(true);
        Mockito.when(this.networkClient.authenticationException(node)).thenReturn(new AuthenticationException(""));
        testInterBrokerSendThread.enqueue(requestAndCompletionHandler);
        testInterBrokerSendThread.doWork();
        ((KafkaClient) Mockito.verify(this.networkClient)).newClientRequest((String) ArgumentMatchers.eq("1"), (AbstractRequest.Builder) ArgumentMatchers.same(requestAndCompletionHandler.request), ArgumentMatchers.anyLong(), ArgumentMatchers.eq(true), ArgumentMatchers.eq(1000), (RequestCompletionHandler) ArgumentMatchers.same(requestAndCompletionHandler.handler));
        ((KafkaClient) Mockito.verify(this.networkClient)).ready((Node) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).connectionDelay((Node) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).connectionFailed((Node) ArgumentMatchers.any());
        ((KafkaClient) Mockito.verify(this.networkClient)).authenticationException((Node) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.networkClient});
        Assertions.assertTrue(this.completionHandler.executedWithDisconnectedResponse);
    }

    @Test
    public void testFailingExpiredRequests() {
        StubRequestBuilder stubRequestBuilder = new StubRequestBuilder();
        Node node = new Node(1, "", 8080);
        RequestAndCompletionHandler requestAndCompletionHandler = new RequestAndCompletionHandler(this.time.milliseconds(), node, stubRequestBuilder, this.completionHandler);
        TestInterBrokerSendThread testInterBrokerSendThread = new TestInterBrokerSendThread(this);
        ClientRequest clientRequest = new ClientRequest("dest", stubRequestBuilder, 0, "1", this.time.milliseconds(), true, 1000, requestAndCompletionHandler.handler);
        this.time.sleep(1500L);
        Mockito.when(this.networkClient.newClientRequest((String) ArgumentMatchers.eq("1"), (AbstractRequest.Builder) ArgumentMatchers.same(requestAndCompletionHandler.request), ArgumentMatchers.eq(requestAndCompletionHandler.creationTimeMs), ArgumentMatchers.eq(true), ArgumentMatchers.eq(1000), (RequestCompletionHandler) ArgumentMatchers.same(requestAndCompletionHandler.handler))).thenReturn(clientRequest);
        Mockito.when(Boolean.valueOf(this.networkClient.ready(node, this.time.milliseconds()))).thenReturn(false);
        Mockito.when(Long.valueOf(this.networkClient.connectionDelay((Node) ArgumentMatchers.any(), ArgumentMatchers.anyLong()))).thenReturn(0L);
        Mockito.when(this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(Collections.emptyList());
        Mockito.when(Boolean.valueOf(this.networkClient.connectionFailed(node))).thenReturn(false);
        testInterBrokerSendThread.enqueue(requestAndCompletionHandler);
        testInterBrokerSendThread.doWork();
        ((KafkaClient) Mockito.verify(this.networkClient)).newClientRequest((String) ArgumentMatchers.eq("1"), (AbstractRequest.Builder) ArgumentMatchers.same(requestAndCompletionHandler.request), ArgumentMatchers.eq(requestAndCompletionHandler.creationTimeMs), ArgumentMatchers.eq(true), ArgumentMatchers.eq(1000), (RequestCompletionHandler) ArgumentMatchers.same(requestAndCompletionHandler.handler));
        ((KafkaClient) Mockito.verify(this.networkClient)).ready((Node) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).connectionDelay((Node) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((KafkaClient) Mockito.verify(this.networkClient)).connectionFailed((Node) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.networkClient});
        Assertions.assertFalse(testInterBrokerSendThread.hasUnsentRequests());
        Assertions.assertTrue(this.completionHandler.executedWithDisconnectedResponse);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInterruption(boolean z) throws InterruptedException, IOException {
        InterruptedException interruptedException = new InterruptedException();
        Mockito.when(this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenAnswer(invocationOnMock -> {
            throw interruptedException;
        });
        AtomicReference atomicReference = new AtomicReference();
        TestInterBrokerSendThread testInterBrokerSendThread = new TestInterBrokerSendThread(this.networkClient, th -> {
            if (z) {
                Assertions.assertTrue(th instanceof InterruptedException);
            } else {
                Assertions.assertTrue(th instanceof FatalExitError);
            }
            atomicReference.getAndSet(th);
        });
        if (z) {
            testInterBrokerSendThread.shutdown();
        }
        testInterBrokerSendThread.pollOnce(100L);
        ((KafkaClient) Mockito.verify(this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        if (z) {
            ((KafkaClient) Mockito.verify(this.networkClient)).initiateClose();
            ((KafkaClient) Mockito.verify(this.networkClient)).close();
        }
        Mockito.verifyNoMoreInteractions(new Object[]{this.networkClient});
        Assertions.assertNotNull(atomicReference.get());
    }
}
