/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.util;

import io.confluent.kafka.schemaregistry.util.ByteConsumerPool;
import io.confluent.kafka.schemaregistry.util.KafkaClientSupplier;
import io.confluent.kafka.schemaregistry.utils.UserGroupInformationMockPolicy;
import io.confluent.rest.exceptions.RestServerErrorException;
import io.confluent.rest.impersonation.Errors;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.security.IdMappingServiceProvider;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IExpectationSetters;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.MockPolicy;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@MockPolicy(value={UserGroupInformationMockPolicy.class})
public class ByteConsumerPoolTest
extends EasyMockSupport {
    private static final String ANY_TOPIC = "/some-stream:topic";
    private static final String USER_NAME = System.getProperty("user.name");
    private static final int USER_ID = 1000;
    private Map<String, Object> consumerConfig = new HashMap<String, Object>();
    private KafkaClientSupplier clientSupplier;
    private ByteConsumerPool consumerPool;
    private IdMappingServiceProvider idMapper;

    @Before
    public void setUp() {
        this.consumerConfig.put("any.config", "has-to-be-copied");
        this.clientSupplier = (KafkaClientSupplier)this.mock(KafkaClientSupplier.class);
        this.idMapper = (IdMappingServiceProvider)this.mock(IdMappingServiceProvider.class);
        this.consumerPool = new ByteConsumerPool(this.consumerConfig, this.clientSupplier, this.idMapper);
    }

    @Test
    public void consumesRecordsFromCreatedConsumer() throws IOException {
        EasyMock.expect((Object)this.idMapper.getUid(USER_NAME)).andReturn((Object)1000);
        KafkaConsumer<byte[], byte[]> consumer = this.mockConsumerForCurrentThreadAndUser(1000);
        this.expectConsumerAssignTopic(consumer, ANY_TOPIC);
        ConsumerRecords expectedRecords = new ConsumerRecords(Collections.emptyMap());
        EasyMock.expect((Object)consumer.poll((Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)expectedRecords);
        this.replayAll();
        ByteConsumerPool consumerPool = this.consumerPool;
        Assert.assertThat((Object)consumerPool.poll(ANY_TOPIC), (Matcher)CoreMatchers.sameInstance((Object)expectedRecords));
        this.verifyAll();
    }

    @Test
    public void cachesConsumerByUser() throws IOException {
        ConsumerRecords expectedU1C1 = new ConsumerRecords(Collections.emptyMap());
        ConsumerRecords expectedU1C2 = new ConsumerRecords(Collections.emptyMap());
        ConsumerRecords expectedU2C1 = new ConsumerRecords(Collections.emptyMap());
        UserGroupInformation u1 = UserGroupInformation.createRemoteUser((String)"U1");
        EasyMock.expect((Object)this.idMapper.getUid(u1.getUserName())).andReturn((Object)1001).times(2);
        KafkaConsumer<byte[], byte[]> consumerU1 = this.mockConsumerForCurrentThreadAndUser(1001);
        EasyMock.expect((Object)consumerU1.poll((Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)expectedU1C1).andReturn((Object)expectedU1C2);
        this.expectConsumerAssignTopic(consumerU1, ANY_TOPIC).times(2);
        UserGroupInformation u2 = UserGroupInformation.createRemoteUser((String)"U2");
        EasyMock.expect((Object)this.idMapper.getUid(u2.getUserName())).andReturn((Object)1002);
        KafkaConsumer<byte[], byte[]> consumerU2 = this.mockConsumerForCurrentThreadAndUser(1002);
        EasyMock.expect((Object)consumerU2.poll((Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)expectedU2C1);
        this.expectConsumerAssignTopic(consumerU2, ANY_TOPIC);
        this.replayAll();
        ByteConsumerPool consumerPool = this.consumerPool;
        PrivilegedAction<ConsumerRecords> poll = () -> consumerPool.poll(ANY_TOPIC);
        Assert.assertThat((Object)((ConsumerRecords)u1.doAs(poll)), (Matcher)CoreMatchers.sameInstance((Object)expectedU1C1));
        Assert.assertThat((Object)((ConsumerRecords)u2.doAs(poll)), (Matcher)CoreMatchers.sameInstance((Object)expectedU2C1));
        Assert.assertThat((Object)((ConsumerRecords)u1.doAs(poll)), (Matcher)CoreMatchers.sameInstance((Object)expectedU1C2));
        this.verifyAll();
    }

    @Test
    public void cachesConsumerByThread() throws Exception {
        EasyMock.expect((Object)this.idMapper.getUid(USER_NAME)).andReturn((Object)1000).times(3);
        ConsumerRecords expectedT1C1 = new ConsumerRecords(Collections.emptyMap());
        ConsumerRecords expectedT1C2 = new ConsumerRecords(Collections.emptyMap());
        ConsumerRecords expectedT2C1 = new ConsumerRecords(Collections.emptyMap());
        ExecutorService t1 = Executors.newSingleThreadExecutor();
        Long idT1 = t1.submit(() -> Thread.currentThread().getId()).get();
        KafkaConsumer<byte[], byte[]> consumerT1 = this.mockConsumerForUserAndThread(1000L, idT1);
        EasyMock.expect((Object)consumerT1.poll((Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)expectedT1C1).andReturn((Object)expectedT1C2);
        this.expectConsumerAssignTopic(consumerT1, ANY_TOPIC).times(2);
        ExecutorService t2 = Executors.newSingleThreadExecutor();
        Long idT2 = t2.submit(() -> Thread.currentThread().getId()).get();
        KafkaConsumer<byte[], byte[]> consumerT2 = this.mockConsumerForUserAndThread(1000L, idT2);
        EasyMock.expect((Object)consumerT2.poll((Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)expectedT2C1);
        this.expectConsumerAssignTopic(consumerT2, ANY_TOPIC);
        this.replayAll();
        Callable<ConsumerRecords> poll = () -> this.consumerPool.poll(ANY_TOPIC);
        Assert.assertThat((Object)t1.submit(poll).get(), (Matcher)CoreMatchers.sameInstance((Object)expectedT1C1));
        Assert.assertThat((Object)t2.submit(poll).get(), (Matcher)CoreMatchers.sameInstance((Object)expectedT2C1));
        Assert.assertThat((Object)t1.submit(poll).get(), (Matcher)CoreMatchers.sameInstance((Object)expectedT1C2));
        this.verifyAll();
    }

    @Test
    public void throwsServerLoginExceptionOnConsumerFailure() throws IOException {
        EasyMock.expect((Object)this.idMapper.getUid(USER_NAME)).andReturn((Object)1000);
        RuntimeException cause = new RuntimeException("Why not");
        KafkaConsumer<byte[], byte[]> consumer = this.mockConsumerForCurrentThreadAndUser(1000);
        EasyMock.expect((Object)consumer.poll((Duration)EasyMock.anyObject())).andThrow((Throwable)cause);
        this.expectConsumerAssignTopic(consumer, ANY_TOPIC);
        this.replayAll();
        try {
            this.consumerPool.poll(ANY_TOPIC);
            Assert.fail();
        }
        catch (RestServerErrorException e) {
            RestServerErrorException expected = Errors.serverLoginException((Throwable)cause);
            Assert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.is((Object)expected.getCause()));
            Assert.assertThat((Object)e.getErrorCode(), (Matcher)CoreMatchers.is((Object)expected.getErrorCode()));
            Assert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.is((Object)expected.getMessage()));
        }
        this.verifyAll();
    }

    private KafkaConsumer<byte[], byte[]> mockConsumerForCurrentThreadAndUser(int userId) {
        return this.mockConsumerForUserAndThread(userId, Thread.currentThread().getId());
    }

    private KafkaConsumer<byte[], byte[]> mockConsumerForUserAndThread(long userId, long threadId) {
        KafkaConsumer consumer = (KafkaConsumer)this.mock(KafkaConsumer.class);
        HashMap<String, Object> expectedProps = new HashMap<String, Object>(this.consumerConfig);
        expectedProps.put("group.id", String.format("t_%d_u%d", threadId, userId));
        EasyMock.expect((Object)this.clientSupplier.getConsumer(expectedProps)).andReturn((Object)consumer).once();
        return consumer;
    }

    private IExpectationSetters<Object> expectConsumerAssignTopic(KafkaConsumer<byte[], byte[]> consumer, String topic) {
        consumer.assign(Collections.singletonList(new TopicPartition(topic, 0)));
        return EasyMock.expectLastCall();
    }
}

