/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror.integration;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
import org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag(value="integration")
public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest
extends MirrorConnectorsIntegrationBaseTest {
    private static final int FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS = 60000;

    protected static void enableAclAuthorizer(Properties brokerProps) {
        brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer");
        brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
        brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
        brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
        brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0");
        brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"super\" password=\"super_pwd\" user_connector=\"connector_pwd\" user_super=\"super_pwd\";");
        brokerProps.put("super.users", "User:super");
    }

    protected static Map<String, String> superUserConfig() {
        HashMap<String, String> superUserClientConfig = new HashMap<String, String>();
        superUserClientConfig.put("sasl.mechanism", "PLAIN");
        superUserClientConfig.put("security.protocol", "SASL_PLAINTEXT");
        superUserClientConfig.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"super\" password=\"super_pwd\";");
        return superUserClientConfig;
    }

    protected static Map<String, String> connectorUserConfig() {
        HashMap<String, String> connectUserClientConfig = new HashMap<String, String>();
        connectUserClientConfig.put("sasl.mechanism", "PLAIN");
        connectUserClientConfig.put("security.protocol", "SASL_PLAINTEXT");
        connectUserClientConfig.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"connector\" password=\"connector_pwd\";");
        return connectUserClientConfig;
    }

    private static void deleteAllACLs(EmbeddedKafkaCluster cluster) throws Exception {
        try (Admin adminClient = cluster.createAdminClient();){
            Set topicsToBeDeleted = (Set)adminClient.listTopics().names().get();
            List aclBindingFilters = topicsToBeDeleted.stream().map(topic -> new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, topic, PatternType.ANY), AccessControlEntryFilter.ANY)).collect(Collectors.toList());
            adminClient.deleteAcls(aclBindingFilters);
        }
    }

    protected static Collection<AclBinding> getAclBindings(EmbeddedKafkaCluster cluster, String topic) throws Exception {
        try (Admin client = cluster.createAdminClient();){
            ResourcePatternFilter topicFilter = new ResourcePatternFilter(ResourceType.TOPIC, topic, PatternType.ANY);
            Collection collection = (Collection)client.describeAcls(new AclBindingFilter(topicFilter, AccessControlEntryFilter.ANY)).values().get();
            return collection;
        }
    }

    @Override
    @BeforeEach
    public void startClusters() throws Exception {
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.enableAclAuthorizer(this.primaryBrokerProps);
        this.additionalPrimaryClusterClientsConfigs.putAll(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.superUserConfig());
        this.primaryWorkerProps.putAll(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.superUserConfig());
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.enableAclAuthorizer(this.backupBrokerProps);
        this.additionalBackupClusterClientsConfigs.putAll(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.superUserConfig());
        this.backupWorkerProps.putAll(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.superUserConfig());
        HashMap<String, String> additionalConfig = new HashMap<String, String>(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.superUserConfig()){
            {
                this.put("forwarding.admin.class", FakeForwardingAdminWithLocalMetadata.class.getName());
            }
        };
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.superUserConfig().forEach((property, value) -> {
            additionalConfig.put("consumer.override." + property, (String)value);
            additionalConfig.put("producer.override." + property, (String)value);
            additionalConfig.put("consumer." + property, (String)value);
            additionalConfig.put("producer." + property, (String)value);
        });
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.connectorUserConfig().forEach((property, value) -> {
            additionalConfig.put("admin." + property, (String)value);
            additionalConfig.put("admin.override." + property, (String)value);
        });
        this.startClusters((Map<String, String>)additionalConfig);
        this.primary.kafka().createAdminClient().createAcls(Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)))).all().get();
        this.backup.kafka().createAdminClient().createAcls(Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)))).all().get();
    }

    @Override
    @AfterEach
    public void shutdownClusters() throws Exception {
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.deleteAllACLs(this.primary.kafka());
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.deleteAllACLs(this.backup.kafka());
        FakeLocalMetadataStore.clear();
        super.shutdownClusters();
    }

    @Test
    public void testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() throws Exception {
        this.produceMessages((Producer<byte[], byte[]>)this.primaryProducer, "test-topic-1");
        this.produceMessages((Producer<byte[], byte[]>)this.backupProducer, "test-topic-1");
        String consumerGroupName = "consumer-group-testReplication";
        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        this.warmUpConsumer(consumerProps);
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitUntilMirrorMakerIsRunning(this.primary, CONNECTOR_LIST, this.mm2Config, "backup", "primary");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.primary, "backup.test-topic-1");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.backup, "primary.test-topic-1");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.primary, "mm2-offset-syncs.backup.internal");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.primary, "backup.checkpoints.internal");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.primary, "backup.heartbeats");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.backup, "mm2-offset-syncs.primary.internal");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.backup, "primary.checkpoints.internal");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.backup, "primary.heartbeats");
        this.waitForTopicToPersistInFakeLocalMetadataStore("backup.test-topic-1");
        this.waitForTopicToPersistInFakeLocalMetadataStore("primary.test-topic-1");
        this.waitForTopicToPersistInFakeLocalMetadataStore("mm2-offset-syncs.backup.internal");
        this.waitForTopicToPersistInFakeLocalMetadataStore("backup.checkpoints.internal");
        this.waitForTopicToPersistInFakeLocalMetadataStore("backup.heartbeats");
        this.waitForTopicToPersistInFakeLocalMetadataStore("mm2-offset-syncs.primary.internal");
        this.waitForTopicToPersistInFakeLocalMetadataStore("primary.checkpoints.internal");
        this.waitForTopicToPersistInFakeLocalMetadataStore("primary.heartbeats");
        this.waitForTopicToPersistInFakeLocalMetadataStore("heartbeats");
    }

    @Test
    public void testCreatePartitionsUseProvidedForwardingAdmin() throws Exception {
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        this.produceMessages((Producer<byte[], byte[]>)this.backupProducer, "test-topic-1");
        this.produceMessages((Producer<byte[], byte[]>)this.primaryProducer, "test-topic-1");
        String consumerGroupName = "consumer-group-testReplication";
        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        this.warmUpConsumer(consumerProps);
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitUntilMirrorMakerIsRunning(this.primary, CONNECTOR_LIST, this.mm2Config, "backup", "primary");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.primary, "backup.test-topic-1");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.backup, "primary.test-topic-1");
        this.waitForTopicToPersistInFakeLocalMetadataStore("backup.test-topic-1");
        this.waitForTopicToPersistInFakeLocalMetadataStore("primary.test-topic-1");
        Map<String, NewPartitions> newPartitions = Collections.singletonMap("test-topic-1", NewPartitions.increaseTo((int)11));
        this.primary.kafka().createAdminClient().createPartitions(newPartitions).all().get();
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicPartitionCreated(this.backup, "primary.test-topic-1", 11);
        this.waitForTopicConfigPersistInFakeLocalMetaDataStore("primary.test-topic-1", "partitions", String.valueOf(11));
    }

    @Test
    public void testSyncTopicConfigUseProvidedForwardingAdmin() throws Exception {
        this.mm2Props.put("sync.topic.configs.enabled", "true");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        this.produceMessages((Producer<byte[], byte[]>)this.backupProducer, "test-topic-1");
        this.produceMessages((Producer<byte[], byte[]>)this.primaryProducer, "test-topic-1");
        String consumerGroupName = "consumer-group-testReplication";
        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        this.warmUpConsumer(consumerProps);
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitUntilMirrorMakerIsRunning(this.primary, CONNECTOR_LIST, this.mm2Config, "backup", "primary");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.primary, "backup.test-topic-1");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.backup, "primary.test-topic-1");
        Assertions.assertEquals((Object)"compact", (Object)MirrorConnectorsWithCustomForwardingAdminIntegrationTest.getTopicConfig(this.backup.kafka(), "primary.test-topic-1", "cleanup.policy"), (String)"topic config was synced");
        this.waitForTopicToPersistInFakeLocalMetadataStore("backup.test-topic-1");
        this.waitForTopicToPersistInFakeLocalMetadataStore("primary.test-topic-1");
        this.waitForTopicConfigPersistInFakeLocalMetaDataStore("primary.test-topic-1", "cleanup.policy", "compact");
    }

    @Test
    public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception {
        this.mm2Props.put("sync.topic.acls.enabled", "true");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        List<AclBinding> aclBindings = Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test-topic-1", PatternType.LITERAL), new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
        this.primary.kafka().createAdminClient().createAcls(aclBindings).all().get();
        this.backup.kafka().createAdminClient().createAcls(aclBindings).all().get();
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitUntilMirrorMakerIsRunning(this.primary, CONNECTOR_LIST, this.mm2Config, "backup", "primary");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.primary, "backup.test-topic-1");
        MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicCreated(this.backup, "primary.test-topic-1");
        AclBinding expectedACLOnBackupCluster = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "primary.test-topic-1", PatternType.LITERAL), new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        AclBinding expectedACLOnPrimaryCluster = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "backup.test-topic-1", PatternType.LITERAL), new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        Assertions.assertTrue((boolean)MirrorConnectorsWithCustomForwardingAdminIntegrationTest.getAclBindings(this.backup.kafka(), "primary.test-topic-1").contains(expectedACLOnBackupCluster), (String)"topic ACLs was synced");
        Assertions.assertTrue((boolean)MirrorConnectorsWithCustomForwardingAdminIntegrationTest.getAclBindings(this.primary.kafka(), "backup.test-topic-1").contains(expectedACLOnPrimaryCluster), (String)"topic ACLs was synced");
        Assertions.assertTrue((boolean)FakeLocalMetadataStore.aclBindings("dummy").containsAll(Arrays.asList(expectedACLOnBackupCluster, expectedACLOnPrimaryCluster)));
    }

    void waitForTopicToPersistInFakeLocalMetadataStore(String topicName) throws InterruptedException {
        TestUtils.waitForCondition(() -> FakeLocalMetadataStore.containsTopic(topicName), (long)60000L, (String)("Topic: " + topicName + " didn't get created in the FakeLocalMetadataStore"));
    }

    void waitForTopicConfigPersistInFakeLocalMetaDataStore(String topicName, String configName, String expectedConfigValue) throws InterruptedException {
        TestUtils.waitForCondition(() -> FakeLocalMetadataStore.topicConfig(topicName).getOrDefault(configName, "").equals(expectedConfigValue), (long)60000L, (String)("Topic: " + topicName + "'s configs don't have " + configName + ":" + expectedConfigValue));
    }
}

