package io.confluent.kafka.schemaregistry.util;

import io.confluent.kafka.schemaregistry.utils.UserGroupInformationMockPolicy;
import io.confluent.rest.exceptions.RestServerErrorException;
import io.confluent.rest.impersonation.Errors;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.security.IdMappingServiceProvider;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IExpectationSetters;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.MockPolicy;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@MockPolicy({UserGroupInformationMockPolicy.class})
/* loaded from: input_file:io/confluent/kafka/schemaregistry/util/ByteConsumerPoolTest.class */
public class ByteConsumerPoolTest extends EasyMockSupport {
    private static final String ANY_TOPIC = "/some-stream:topic";
    private static final String USER_NAME = System.getProperty("user.name");
    private static final int USER_ID = 1000;
    private Map<String, Object> consumerConfig = new HashMap();
    private KafkaClientSupplier clientSupplier;
    private ByteConsumerPool consumerPool;
    private IdMappingServiceProvider idMapper;

    @Before
    public void setUp() {
        this.consumerConfig.put("any.config", "has-to-be-copied");
        this.clientSupplier = (KafkaClientSupplier) mock(KafkaClientSupplier.class);
        this.idMapper = (IdMappingServiceProvider) mock(IdMappingServiceProvider.class);
        this.consumerPool = new ByteConsumerPool(this.consumerConfig, this.clientSupplier, this.idMapper);
    }

    @Test
    public void consumesRecordsFromCreatedConsumer() throws IOException {
        EasyMock.expect(Integer.valueOf(this.idMapper.getUid(USER_NAME))).andReturn(Integer.valueOf(USER_ID));
        KafkaConsumer<byte[], byte[]> mockConsumerForCurrentThreadAndUser = mockConsumerForCurrentThreadAndUser(USER_ID);
        expectConsumerAssignTopic(mockConsumerForCurrentThreadAndUser, ANY_TOPIC);
        ConsumerRecords consumerRecords = new ConsumerRecords(Collections.emptyMap());
        mockConsumerForCurrentThreadAndUser.seekToBeginning((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect(mockConsumerForCurrentThreadAndUser.poll((Duration) EasyMock.anyObject(Duration.class))).andReturn(consumerRecords);
        replayAll();
        Assert.assertThat(this.consumerPool.poll(ANY_TOPIC), CoreMatchers.sameInstance(consumerRecords));
        verifyAll();
    }

    @Test
    public void cachesConsumerByUser() throws IOException {
        ConsumerRecords consumerRecords = new ConsumerRecords(Collections.emptyMap());
        ConsumerRecords consumerRecords2 = new ConsumerRecords(Collections.emptyMap());
        ConsumerRecords consumerRecords3 = new ConsumerRecords(Collections.emptyMap());
        UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser("U1");
        EasyMock.expect(Integer.valueOf(this.idMapper.getUid(createRemoteUser.getUserName()))).andReturn(1001).times(2);
        KafkaConsumer<byte[], byte[]> mockConsumerForCurrentThreadAndUser = mockConsumerForCurrentThreadAndUser(1001);
        mockConsumerForCurrentThreadAndUser.seekToBeginning((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall().times(2);
        EasyMock.expect(mockConsumerForCurrentThreadAndUser.poll((Duration) EasyMock.anyObject(Duration.class))).andReturn(consumerRecords).andReturn(consumerRecords2);
        expectConsumerAssignTopic(mockConsumerForCurrentThreadAndUser, ANY_TOPIC).times(2);
        UserGroupInformation createRemoteUser2 = UserGroupInformation.createRemoteUser("U2");
        EasyMock.expect(Integer.valueOf(this.idMapper.getUid(createRemoteUser2.getUserName()))).andReturn(1002);
        KafkaConsumer<byte[], byte[]> mockConsumerForCurrentThreadAndUser2 = mockConsumerForCurrentThreadAndUser(1002);
        mockConsumerForCurrentThreadAndUser2.seekToBeginning((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect(mockConsumerForCurrentThreadAndUser2.poll((Duration) EasyMock.anyObject(Duration.class))).andReturn(consumerRecords3);
        expectConsumerAssignTopic(mockConsumerForCurrentThreadAndUser2, ANY_TOPIC);
        replayAll();
        ByteConsumerPool byteConsumerPool = this.consumerPool;
        PrivilegedAction privilegedAction = () -> {
            return byteConsumerPool.poll(ANY_TOPIC);
        };
        Assert.assertThat((ConsumerRecords) createRemoteUser.doAs(privilegedAction), CoreMatchers.sameInstance(consumerRecords));
        Assert.assertThat((ConsumerRecords) createRemoteUser2.doAs(privilegedAction), CoreMatchers.sameInstance(consumerRecords3));
        Assert.assertThat((ConsumerRecords) createRemoteUser.doAs(privilegedAction), CoreMatchers.sameInstance(consumerRecords2));
        verifyAll();
    }

    @Test
    public void cachesConsumerByThread() throws Exception {
        EasyMock.expect(Integer.valueOf(this.idMapper.getUid(USER_NAME))).andReturn(Integer.valueOf(USER_ID)).times(3);
        ConsumerRecords consumerRecords = new ConsumerRecords(Collections.emptyMap());
        ConsumerRecords consumerRecords2 = new ConsumerRecords(Collections.emptyMap());
        ConsumerRecords consumerRecords3 = new ConsumerRecords(Collections.emptyMap());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        KafkaConsumer<byte[], byte[]> mockConsumerForUserAndThread = mockConsumerForUserAndThread(1000L, ((Long) newSingleThreadExecutor.submit(() -> {
            return Long.valueOf(Thread.currentThread().getId());
        }).get()).longValue());
        mockConsumerForUserAndThread.seekToBeginning((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall().times(2);
        EasyMock.expect(mockConsumerForUserAndThread.poll((Duration) EasyMock.anyObject(Duration.class))).andReturn(consumerRecords).andReturn(consumerRecords2);
        expectConsumerAssignTopic(mockConsumerForUserAndThread, ANY_TOPIC).times(2);
        ExecutorService newSingleThreadExecutor2 = Executors.newSingleThreadExecutor();
        KafkaConsumer<byte[], byte[]> mockConsumerForUserAndThread2 = mockConsumerForUserAndThread(1000L, ((Long) newSingleThreadExecutor2.submit(() -> {
            return Long.valueOf(Thread.currentThread().getId());
        }).get()).longValue());
        mockConsumerForUserAndThread2.seekToBeginning((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect(mockConsumerForUserAndThread2.poll((Duration) EasyMock.anyObject(Duration.class))).andReturn(consumerRecords3);
        expectConsumerAssignTopic(mockConsumerForUserAndThread2, ANY_TOPIC);
        replayAll();
        Callable callable = () -> {
            return this.consumerPool.poll(ANY_TOPIC);
        };
        Assert.assertThat((ConsumerRecords) newSingleThreadExecutor.submit(callable).get(), CoreMatchers.sameInstance(consumerRecords));
        Assert.assertThat((ConsumerRecords) newSingleThreadExecutor2.submit(callable).get(), CoreMatchers.sameInstance(consumerRecords3));
        Assert.assertThat((ConsumerRecords) newSingleThreadExecutor.submit(callable).get(), CoreMatchers.sameInstance(consumerRecords2));
        verifyAll();
    }

    @Test
    public void throwsServerLoginExceptionOnConsumerFailure() throws IOException {
        EasyMock.expect(Integer.valueOf(this.idMapper.getUid(USER_NAME))).andReturn(Integer.valueOf(USER_ID));
        RuntimeException runtimeException = new RuntimeException("Why not");
        KafkaConsumer<byte[], byte[]> mockConsumerForCurrentThreadAndUser = mockConsumerForCurrentThreadAndUser(USER_ID);
        mockConsumerForCurrentThreadAndUser.seekToBeginning((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect(mockConsumerForCurrentThreadAndUser.poll((Duration) EasyMock.anyObject())).andThrow(runtimeException);
        expectConsumerAssignTopic(mockConsumerForCurrentThreadAndUser, ANY_TOPIC);
        replayAll();
        try {
            this.consumerPool.poll(ANY_TOPIC);
            Assert.fail();
        } catch (RestServerErrorException e) {
            RestServerErrorException serverLoginException = Errors.serverLoginException(runtimeException);
            Assert.assertThat(e.getCause(), CoreMatchers.is(serverLoginException.getCause()));
            Assert.assertThat(Integer.valueOf(e.getErrorCode()), CoreMatchers.is(Integer.valueOf(serverLoginException.getErrorCode())));
            Assert.assertThat(e.getMessage(), CoreMatchers.is(serverLoginException.getMessage()));
        }
        verifyAll();
    }

    private KafkaConsumer<byte[], byte[]> mockConsumerForCurrentThreadAndUser(int i) {
        return mockConsumerForUserAndThread(i, Thread.currentThread().getId());
    }

    private KafkaConsumer<byte[], byte[]> mockConsumerForUserAndThread(long j, long j2) {
        KafkaConsumer<byte[], byte[]> kafkaConsumer = (KafkaConsumer) mock(KafkaConsumer.class);
        HashMap hashMap = new HashMap(this.consumerConfig);
        hashMap.put("group.id", String.format("t_%d_u%d", Long.valueOf(j2), Long.valueOf(j)));
        EasyMock.expect(this.clientSupplier.getConsumer(hashMap)).andReturn(kafkaConsumer).once();
        return kafkaConsumer;
    }

    private IExpectationSetters<Object> expectConsumerAssignTopic(KafkaConsumer<byte[], byte[]> kafkaConsumer, String str) {
        kafkaConsumer.assign(Collections.singletonList(new TopicPartition(str, 0)));
        return EasyMock.expectLastCall();
    }
}
