package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.class */
public class CommitRequestManagerTest {
    private SubscriptionState subscriptionState;
    private GroupState groupState;
    private LogContext logContext;
    private MockTime time;
    private CoordinatorRequestManager coordinatorRequestManager;
    private Properties props;
    private Node mockedNode = new Node(1, "host1", 9092);

    @BeforeEach
    public void setup() {
        this.logContext = new LogContext();
        this.time = new MockTime(0L);
        this.subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        this.coordinatorRequestManager = (CoordinatorRequestManager) Mockito.mock(CoordinatorRequestManager.class);
        this.groupState = new GroupState("group-1", Optional.empty());
        this.props = new Properties();
        this.props.put("auto.commit.interval.ms", 100);
        this.props.put("key.deserializer", StringDeserializer.class);
        this.props.put("value.deserializer", StringDeserializer.class);
    }

    @Test
    public void testPoll_SkipIfCoordinatorUnknown() {
        CommitRequestManager create = create(false, 0L);
        assertPoll(false, 0, create);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        create.addOffsetCommitRequest(hashMap);
        assertPoll(false, 0, create);
    }

    @Test
    public void testPoll_EnsureManualCommitSent() {
        CommitRequestManager create = create(false, 0L);
        assertPoll(0, create);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        create.addOffsetCommitRequest(hashMap);
        assertPoll(1, create);
    }

    @Test
    public void testPoll_EnsureAutocommitSent() {
        CommitRequestManager create = create(true, 100L);
        assertPoll(0, create);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        create.updateAutoCommitTimer(this.time.milliseconds());
        Mockito.when(this.subscriptionState.allConsumed()).thenReturn(hashMap);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(1, create);
    }

    @Test
    public void testPoll_EnsureCorrectInflightRequestBufferSize() {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("test", 0), new OffsetAndMetadata(10L));
        hashMap.put(new TopicPartition("test", 1), new OffsetAndMetadata(20L));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new TopicPartition("test", 3), new OffsetAndMetadata(20L));
        hashMap2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList.add(create.addOffsetCommitRequest(hashMap));
        arrayList2.add(create.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 0))));
        arrayList.add(create.addOffsetCommitRequest(hashMap2));
        arrayList2.add(create.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 1))));
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(4, poll.unsentRequests.size());
        Assertions.assertTrue(poll.unsentRequests.stream().anyMatch(unsentRequest -> {
            return unsentRequest.requestBuilder() instanceof OffsetCommitRequest.Builder;
        }));
        Assertions.assertTrue(poll.unsentRequests.stream().anyMatch(unsentRequest2 -> {
            return unsentRequest2.requestBuilder() instanceof OffsetFetchRequest.Builder;
        }));
        Assertions.assertFalse(create.pendingRequests.hasUnsentRequests());
        Assertions.assertEquals(2, create.pendingRequests.inflightOffsetFetches.size());
        arrayList.forEach(completableFuture -> {
            completableFuture.complete(null);
        });
        arrayList2.forEach(completableFuture2 -> {
            completableFuture2.complete(null);
        });
        Assertions.assertEquals(0, create.pendingRequests.inflightOffsetFetches.size());
    }

    @Test
    public void testPoll_EnsureEmptyPendingRequestAfterPoll() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        create.addOffsetCommitRequest(new HashMap());
        Assertions.assertEquals(1, create.unsentOffsetCommitRequests().size());
        create.poll(this.time.milliseconds());
        Assertions.assertTrue(create.unsentOffsetCommitRequests().isEmpty());
        assertEmptyPendingRequests(create);
    }

    @Test
    public void testAutocommit_ResendAutocommitAfterException() {
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        List<CompletableFuture<ClientResponse>> assertPoll = assertPoll(1, create);
        this.time.sleep(99L);
        assertPoll.get(0).completeExceptionally(new KafkaException("test exception"));
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(0, create);
        this.time.sleep(1L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(1, create);
        assertEmptyPendingRequests(create);
    }

    @Test
    public void testAutocommit_EnsureOnlyOneInflightRequest() {
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        List<CompletableFuture<ClientResponse>> assertPoll = assertPoll(1, create);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(0, create);
        assertEmptyPendingRequests(create);
        assertPoll.get(0).complete(null);
        assertPoll(1, create);
    }

    @Test
    public void testOffsetFetchRequest_EnsureDuplicatedRequestSucceed() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        hashSet.add(new TopicPartition("t1", 0));
        sendAndVerifyDuplicatedRequests(create, hashSet, 2, Errors.NONE).forEach(completableFuture -> {
            Assertions.assertTrue(completableFuture.isDone());
            Assertions.assertFalse(completableFuture.isCompletedExceptionally());
        });
        create.poll(0L);
        assertEmptyPendingRequests(create);
    }

    @MethodSource({"exceptionSupplier"})
    @ParameterizedTest
    public void testOffsetFetchRequest_ErroredRequests(Errors errors, boolean z) {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        hashSet.add(new TopicPartition("t1", 0));
        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndVerifyDuplicatedRequests = sendAndVerifyDuplicatedRequests(create, hashSet, 5, errors);
        if (z) {
            testRetriable(create, sendAndVerifyDuplicatedRequests);
        } else {
            testNonRetriable(sendAndVerifyDuplicatedRequests);
            assertEmptyPendingRequests(create);
        }
    }

    private void testRetriable(CommitRequestManager commitRequestManager, List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> list) {
        list.forEach(completableFuture -> {
            Assertions.assertFalse(completableFuture.isDone());
        });
        this.time.sleep(500L);
        commitRequestManager.poll(this.time.milliseconds());
        list.forEach(completableFuture2 -> {
            Assertions.assertFalse(completableFuture2.isDone());
        });
    }

    private void testNonRetriable(List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> list) {
        list.forEach(completableFuture -> {
            Assertions.assertTrue(completableFuture.isCompletedExceptionally());
        });
    }

    private static Stream<Arguments> exceptionSupplier() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Errors.NOT_COORDINATOR, true}), Arguments.of(new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS, true}), Arguments.of(new Object[]{Errors.UNKNOWN_SERVER_ERROR, false}), Arguments.of(new Object[]{Errors.GROUP_AUTHORIZATION_FAILED, false}), Arguments.of(new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED, false})});
    }

    @MethodSource({"partitionDataErrorSupplier"})
    @ParameterizedTest
    public void testOffsetFetchRequest_PartitionDataError(Errors errors, boolean z) {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        TopicPartition topicPartition = new TopicPartition("t1", 2);
        TopicPartition topicPartition2 = new TopicPartition("t2", 3);
        hashSet.add(topicPartition);
        hashSet.add(topicPartition2);
        CompletableFuture addOffsetFetchRequest = create.addOffsetFetchRequest(hashSet);
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        HashMap<TopicPartition, OffsetFetchResponse.PartitionData> hashMap = new HashMap<>();
        hashMap.put(topicPartition, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", errors));
        hashMap.put(topicPartition2, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE));
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).future().complete(buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), hashMap, Errors.NONE));
        if (z) {
            testRetriable(create, Collections.singletonList(addOffsetFetchRequest));
        } else {
            testNonRetriable(Collections.singletonList(addOffsetFetchRequest));
        }
    }

    private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
        Assertions.assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
        Assertions.assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
        Assertions.assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
    }

    private static Stream<Arguments> partitionDataErrorSupplier() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Errors.UNSTABLE_OFFSET_COMMIT, true}), Arguments.of(new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION, false}), Arguments.of(new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED, false}), Arguments.of(new Object[]{Errors.UNKNOWN_SERVER_ERROR, false})});
    }

    private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndVerifyDuplicatedRequests(CommitRequestManager commitRequestManager, Set<TopicPartition> set, int i, Errors errors) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(commitRequestManager.addOffsetFetchRequest(set));
        }
        NetworkClientDelegate.PollResult poll = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).future().complete(buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), set, errors));
        Assertions.assertEquals(0, commitRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        return arrayList;
    }

    private List<CompletableFuture<ClientResponse>> assertPoll(int i, CommitRequestManager commitRequestManager) {
        return assertPoll(true, i, commitRequestManager);
    }

    private List<CompletableFuture<ClientResponse>> assertPoll(boolean z, int i, CommitRequestManager commitRequestManager) {
        if (z) {
            Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        } else {
            Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
        }
        NetworkClientDelegate.PollResult poll = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(i, poll.unsentRequests.size());
        return (List) poll.unsentRequests.stream().map((v0) -> {
            return v0.future();
        }).collect(Collectors.toList());
    }

    private CommitRequestManager create(boolean z, long j) {
        this.props.setProperty("auto.commit.interval.ms", String.valueOf(j));
        this.props.setProperty("enable.auto.commit", String.valueOf(z));
        return new CommitRequestManager(this.time, this.logContext, this.subscriptionState, new ConsumerConfig(this.props), this.coordinatorRequestManager, this.groupState);
    }

    private ClientResponse buildOffsetFetchClientResponse(NetworkClientDelegate.UnsentRequest unsentRequest, Set<TopicPartition> set, Errors errors) {
        HashMap<TopicPartition, OffsetFetchResponse.PartitionData> hashMap = new HashMap<>();
        set.forEach(topicPartition -> {
            hashMap.put(topicPartition, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE));
        });
        return buildOffsetFetchClientResponse(unsentRequest, hashMap, errors);
    }

    private ClientResponse buildOffsetFetchClientResponse(NetworkClientDelegate.UnsentRequest unsentRequest, HashMap<TopicPartition, OffsetFetchResponse.PartitionData> hashMap, Errors errors) {
        OffsetFetchRequest build = unsentRequest.requestBuilder().build();
        Assertions.assertTrue(build instanceof OffsetFetchRequest);
        OffsetFetchRequest offsetFetchRequest = build;
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_FETCH, offsetFetchRequest.version(), "", 1), unsentRequest.callback(), "-1", this.time.milliseconds(), this.time.milliseconds(), false, (UnsupportedVersionException) null, (AuthenticationException) null, new OffsetFetchResponse(errors, hashMap));
    }
}
