/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.mirror.ConfigPropertyFilter;
import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
import org.apache.kafka.connect.mirror.DefaultReplicationPolicy;
import org.apache.kafka.connect.mirror.DefaultTopicFilter;
import org.apache.kafka.connect.mirror.IdentityReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
import org.apache.kafka.connect.mirror.MirrorSourceConfig;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.MirrorUtils;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.TestUtils;
import org.apache.kafka.connect.mirror.TopicFilter;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class MirrorSourceConnectorTest {
    private ConfigPropertyFilter getConfigPropertyFilter() {
        return new ConfigPropertyFilter(){

            public boolean shouldReplicateConfigProperty(String prop) {
                return true;
            }
        };
    }

    @Test
    public void testReplicatesHeartbeatsByDefault() {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), (TopicFilter)new DefaultTopicFilter(), (ConfigPropertyFilter)new DefaultConfigPropertyFilter());
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("heartbeats"), (String)"should replicate heartbeats");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("us-west.heartbeats"), (String)"should replicate upstream heartbeats");
    }

    @Test
    public void testReplicatesHeartbeatsDespiteFilter() {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), x -> false, (ConfigPropertyFilter)new DefaultConfigPropertyFilter());
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("heartbeats"), (String)"should replicate heartbeats");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("us-west.heartbeats"), (String)"should replicate upstream heartbeats");
    }

    @Test
    public void testNoCycles() {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), x -> true, this.getConfigPropertyFilter());
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("target.topic1"), (String)"should not allow cycles");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("target.source.topic1"), (String)"should not allow cycles");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("source.target.topic1"), (String)"should not allow cycles");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("target.source.target.topic1"), (String)"should not allow cycles");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("source.target.source.topic1"), (String)"should not allow cycles");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("topic1"), (String)"should allow anything else");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("source.topic1"), (String)"should allow anything else");
    }

    @Test
    public void testIdentityReplication() {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new IdentityReplicationPolicy(), x -> true, this.getConfigPropertyFilter());
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("target.topic1"), (String)"should allow cycles");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("target.source.topic1"), (String)"should allow cycles");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("source.target.topic1"), (String)"should allow cycles");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("target.source.target.topic1"), (String)"should allow cycles");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("source.target.source.topic1"), (String)"should allow cycles");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("topic1"), (String)"should allow normal topics");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("othersource.topic1"), (String)"should allow normal topics");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("target.heartbeats"), (String)"should not allow heartbeat cycles");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("target.source.heartbeats"), (String)"should not allow heartbeat cycles");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("source.target.heartbeats"), (String)"should not allow heartbeat cycles");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("target.source.target.heartbeats"), (String)"should not allow heartbeat cycles");
        Assertions.assertFalse((boolean)connector.shouldReplicateTopic("source.target.source.heartbeats"), (String)"should not allow heartbeat cycles");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("heartbeats"), (String)"should allow heartbeat topics");
        Assertions.assertTrue((boolean)connector.shouldReplicateTopic("othersource.heartbeats"), (String)"should allow heartbeat topics");
    }

    @Test
    public void testAclFiltering() {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), x -> true, this.getConfigPropertyFilter());
        Assertions.assertFalse((boolean)connector.shouldReplicateAcl(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), (String)"should not replicate ALLOW WRITE");
        Assertions.assertTrue((boolean)connector.shouldReplicateAcl(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), (String)"should replicate ALLOW ALL");
    }

    @Test
    public void testAclTransformation() {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), x -> true, this.getConfigPropertyFilter());
        AclBinding allowAllAclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding processedAllowAllAclBinding = connector.targetAclBinding(allowAllAclBinding);
        String expectedRemoteTopicName = "source." + allowAllAclBinding.pattern().name();
        Assertions.assertEquals((Object)expectedRemoteTopicName, (Object)processedAllowAllAclBinding.pattern().name(), (String)"should change topic name");
        Assertions.assertEquals((Object)processedAllowAllAclBinding.entry().operation(), (Object)AclOperation.READ, (String)"should change ALL to READ");
        Assertions.assertEquals((Object)processedAllowAllAclBinding.entry().permissionType(), (Object)AclPermissionType.ALLOW, (String)"should not change ALLOW");
        AclBinding denyAllAclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY));
        AclBinding processedDenyAllAclBinding = connector.targetAclBinding(denyAllAclBinding);
        Assertions.assertEquals((Object)processedDenyAllAclBinding.entry().operation(), (Object)AclOperation.ALL, (String)"should not change ALL");
        Assertions.assertEquals((Object)processedDenyAllAclBinding.entry().permissionType(), (Object)AclPermissionType.DENY, (String)"should not change DENY");
    }

    @Test
    public void testNoBrokerAclAuthorizer() throws Exception {
        Admin sourceAdmin = (Admin)Mockito.mock(Admin.class);
        Admin targetAdmin = (Admin)Mockito.mock(Admin.class);
        MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin);
        ExecutionException describeAclsFailure = new ExecutionException("Failed to describe ACLs", (Throwable)new SecurityDisabledException("No ACL authorizer configured on this broker"));
        KafkaFuture describeAclsFuture = (KafkaFuture)Mockito.mock(KafkaFuture.class);
        Mockito.when((Object)((Collection)describeAclsFuture.get())).thenThrow(new Throwable[]{describeAclsFailure});
        DescribeAclsResult describeAclsResult = (DescribeAclsResult)Mockito.mock(DescribeAclsResult.class);
        Mockito.when((Object)describeAclsResult.values()).thenReturn((Object)describeAclsFuture);
        Mockito.when((Object)sourceAdmin.describeAcls((AclBindingFilter)Mockito.any())).thenReturn((Object)describeAclsResult);
        try (LogCaptureAppender connectorLogs = LogCaptureAppender.createAndRegister(MirrorSourceConnector.class);){
            LogCaptureAppender.setClassLoggerToTrace(MirrorSourceConnector.class);
            connector.syncTopicAcls();
            long aclSyncDisableMessages = connectorLogs.getMessages().stream().filter(m -> m.contains("Consider disabling topic ACL syncing")).count();
            Assertions.assertEquals((long)1L, (long)aclSyncDisableMessages, (String)"Should have recommended that user disable ACL syncing");
            long aclSyncSkippingMessages = connectorLogs.getMessages().stream().filter(m -> m.contains("skipping topic ACL sync")).count();
            Assertions.assertEquals((long)0L, (long)aclSyncSkippingMessages, (String)"Should not have logged ACL sync skip at same time as suggesting ACL sync be disabled");
            connector.syncTopicAcls();
            connector.syncTopicAcls();
            aclSyncDisableMessages = connectorLogs.getMessages().stream().filter(m -> m.contains("Consider disabling topic ACL syncing")).count();
            Assertions.assertEquals((long)1L, (long)aclSyncDisableMessages, (String)"Should not have recommended that user disable ACL syncing more than once");
            aclSyncSkippingMessages = connectorLogs.getMessages().stream().filter(m -> m.contains("skipping topic ACL sync")).count();
            Assertions.assertEquals((long)2L, (long)aclSyncSkippingMessages, (String)"Should have logged ACL sync skip instead of suggesting disabling ACL syncing");
        }
        Mockito.verifyNoInteractions((Object[])new Object[]{targetAdmin});
    }

    @Test
    public void testConfigPropertyFiltering() {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), x -> true, (ConfigPropertyFilter)new DefaultConfigPropertyFilter());
        ArrayList<ConfigEntry> entries = new ArrayList<ConfigEntry>();
        entries.add(new ConfigEntry("name-1", "value-1"));
        entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
        entries.add(new ConfigEntry("min.insync.replicas", "2"));
        Config config = new Config(entries);
        Config targetConfig = connector.targetConfig(config, true);
        Assertions.assertTrue((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("name-1")), (String)"should replicate properties");
        Assertions.assertTrue((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("name-2")), (String)"should include default properties");
        Assertions.assertFalse((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("min.insync.replicas")), (String)"should not replicate excluded properties");
    }

    @Test
    @Deprecated
    public void testConfigPropertyFilteringWithAlterConfigs() {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), x -> true, (ConfigPropertyFilter)new DefaultConfigPropertyFilter());
        ArrayList<ConfigEntry> entries = new ArrayList<ConfigEntry>();
        entries.add(new ConfigEntry("name-1", "value-1"));
        entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
        entries.add(new ConfigEntry("min.insync.replicas", "2"));
        Config config = new Config(entries);
        Config targetConfig = connector.targetConfig(config, false);
        Assertions.assertTrue((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("name-1")), (String)"should replicate properties");
        Assertions.assertFalse((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("name-2")), (String)"should not replicate default properties");
        Assertions.assertFalse((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("min.insync.replicas")), (String)"should not replicate excluded properties");
    }

    @Test
    @Deprecated
    public void testConfigPropertyFilteringWithAlterConfigsAndSourceDefault() {
        Map<String, String> filterConfig = Collections.singletonMap("use.defaults.from", "source");
        DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
        filter.configure(filterConfig);
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), x -> true, (ConfigPropertyFilter)filter);
        ArrayList<ConfigEntry> entries = new ArrayList<ConfigEntry>();
        entries.add(new ConfigEntry("name-1", "value-1"));
        entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
        entries.add(new ConfigEntry("min.insync.replicas", "2"));
        Config config = new Config(entries);
        Config targetConfig = connector.targetConfig(config, false);
        Assertions.assertTrue((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("name-1")), (String)"should replicate properties");
        Assertions.assertTrue((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("name-2")), (String)"should include default properties");
        Assertions.assertFalse((boolean)targetConfig.entries().stream().anyMatch(x -> x.name().equals("min.insync.replicas")), (String)"should not replicate excluded properties");
    }

    @Test
    public void testNewTopicConfigs() throws Exception {
        HashMap<String, String> filterConfig = new HashMap<String, String>();
        filterConfig.put("config.properties.exclude", "follower\\.replication\\.throttled\\.replicas, leader\\.replication\\.throttled\\.replicas, message\\.timestamp\\.difference\\.max\\.ms, message\\.timestamp\\.type, unclean\\.leader\\.election\\.enable, min\\.insync\\.replicas,exclude_param.*");
        DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
        filter.configure(filterConfig);
        MirrorSourceConnector connector = (MirrorSourceConnector)Mockito.spy((Object)new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), x -> true, (ConfigPropertyFilter)filter));
        String topic = "testtopic";
        ArrayList<ConfigEntry> entries = new ArrayList<ConfigEntry>();
        entries.add(new ConfigEntry("name-1", "value-1"));
        entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
        entries.add(new ConfigEntry("min.insync.replicas", "2"));
        Config config = new Config(entries);
        ((MirrorSourceConnector)Mockito.doReturn(Collections.singletonMap("testtopic", config)).when((Object)connector)).describeTopicConfigs((Set)Mockito.any());
        ((MirrorSourceConnector)Mockito.doAnswer(invocation -> {
            Map newTopics = (Map)invocation.getArgument(0);
            Assertions.assertNotNull(newTopics.get("source.testtopic"));
            Map targetConfig = ((NewTopic)newTopics.get("source.testtopic")).configs();
            Assertions.assertNotNull(targetConfig.get("name-1"), (String)"should replicate properties");
            String prop1 = "min.insync.replicas";
            Assertions.assertNull(targetConfig.get(prop1), (String)("should not replicate excluded properties " + prop1));
            String prop2 = "exclude_param.param1";
            Assertions.assertNull(targetConfig.get(prop2), (String)("should not replicate excluded properties " + prop2));
            return null;
        }).when((Object)connector)).createNewTopics((Map)Mockito.any());
        connector.createNewTopics(Collections.singleton("testtopic"), Collections.singletonMap("testtopic", 1L));
        ((MirrorSourceConnector)Mockito.verify((Object)connector)).createNewTopics((Set)Mockito.any(), (Map)Mockito.any());
    }

    @Test
    @Deprecated
    public void testIncrementalAlterConfigsRequested() throws Exception {
        Map<String, String> props = TestUtils.makeProps(new String[0]);
        props.put("use.incremental.alter.configs", "requested");
        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
        Admin admin = (Admin)Mockito.mock(Admin.class);
        MirrorSourceConnector connector = (MirrorSourceConnector)Mockito.spy((Object)new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), connectorConfig, (ConfigPropertyFilter)new DefaultConfigPropertyFilter(), admin));
        String topic = "testtopic";
        List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
        Config config = new Config(entries);
        ((MirrorSourceConnector)Mockito.doReturn(Collections.singletonMap("testtopic", config)).when((Object)connector)).describeTopicConfigs((Set)Mockito.any());
        ((Admin)Mockito.doReturn((Object)AdminClientTestUtils.alterConfigsResult((ConfigResource)new ConfigResource(ConfigResource.Type.TOPIC, "testtopic"), (Throwable)new UnsupportedVersionException("Unsupported API"))).when((Object)admin)).incrementalAlterConfigs((Map)Mockito.any());
        ((MirrorSourceConnector)Mockito.doNothing().when((Object)connector)).deprecatedAlterConfigs((Map)Mockito.any());
        connector.syncTopicConfigs();
        Map<String, Config> topicConfigs = Collections.singletonMap("source.testtopic", config);
        ((MirrorSourceConnector)Mockito.verify((Object)connector)).incrementalAlterConfigs(topicConfigs);
        connector.syncTopicConfigs();
        ((MirrorSourceConnector)Mockito.verify((Object)connector, (VerificationMode)Mockito.times((int)1))).deprecatedAlterConfigs(topicConfigs);
    }

    @Test
    @Deprecated
    public void testIncrementalAlterConfigsRequired() throws Exception {
        Map<String, String> props = TestUtils.makeProps(new String[0]);
        props.put("use.incremental.alter.configs", "required");
        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
        Admin admin = (Admin)Mockito.mock(Admin.class);
        MirrorSourceConnector connector = (MirrorSourceConnector)Mockito.spy((Object)new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), connectorConfig, (ConfigPropertyFilter)new DefaultConfigPropertyFilter(), admin));
        String topic = "testtopic";
        ArrayList<ConfigEntry> entries = new ArrayList<ConfigEntry>();
        ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1");
        ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
        entries.add(entryWithNonDefaultValue);
        entries.add(entryWithDefaultValue);
        Config config = new Config(entries);
        ((MirrorSourceConnector)Mockito.doReturn(Collections.singletonMap("testtopic", config)).when((Object)connector)).describeTopicConfigs((Set)Mockito.any());
        ((Admin)Mockito.doAnswer(invocation -> {
            Map configOps = (Map)invocation.getArgument(0);
            Assertions.assertNotNull((Object)configOps);
            Assertions.assertEquals((int)1, (int)configOps.size());
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source.testtopic");
            ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
            ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET));
            ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE));
            Assertions.assertEquals(ops, configOps.get(configResource));
            return AdminClientTestUtils.alterConfigsResult((ConfigResource)configResource);
        }).when((Object)admin)).incrementalAlterConfigs((Map)Mockito.any());
        connector.syncTopicConfigs();
        Map<String, Config> topicConfigs = Collections.singletonMap("source.testtopic", config);
        ((MirrorSourceConnector)Mockito.verify((Object)connector)).incrementalAlterConfigs(topicConfigs);
    }

    @Test
    @Deprecated
    public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception {
        Map<String, String> props = TestUtils.makeProps(new String[0]);
        props.put("use.incremental.alter.configs", "required");
        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
        Admin admin = (Admin)Mockito.mock(Admin.class);
        ConnectorContext connectorContext = (ConnectorContext)Mockito.mock(ConnectorContext.class);
        MirrorSourceConnector connector = (MirrorSourceConnector)Mockito.spy((Object)new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), connectorConfig, (ConfigPropertyFilter)new DefaultConfigPropertyFilter(), admin));
        connector.initialize(connectorContext);
        String topic = "testtopic";
        List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
        Config config = new Config(entries);
        ((MirrorSourceConnector)Mockito.doReturn(Collections.singletonMap("testtopic", config)).when((Object)connector)).describeTopicConfigs((Set)Mockito.any());
        ((Admin)Mockito.doReturn((Object)AdminClientTestUtils.alterConfigsResult((ConfigResource)new ConfigResource(ConfigResource.Type.TOPIC, "testtopic"), (Throwable)new UnsupportedVersionException("Unsupported API"))).when((Object)admin)).incrementalAlterConfigs((Map)Mockito.any());
        connector.syncTopicConfigs();
        ((ConnectorContext)Mockito.verify((Object)connectorContext)).raiseError((Exception)ArgumentMatchers.isA(ConnectException.class));
    }

    @Test
    @Deprecated
    public void testIncrementalAlterConfigsNeverUsed() throws Exception {
        Map<String, String> props = TestUtils.makeProps(new String[0]);
        props.put("use.incremental.alter.configs", "never");
        MirrorSourceConfig connectorConfigs = new MirrorSourceConfig(props);
        MirrorSourceConnector connector = (MirrorSourceConnector)Mockito.spy((Object)new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), connectorConfigs, (ConfigPropertyFilter)new DefaultConfigPropertyFilter(), null));
        String topic = "testtopic";
        List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
        Config config = new Config(entries);
        ((MirrorSourceConnector)Mockito.doReturn(Collections.singletonMap("testtopic", config)).when((Object)connector)).describeTopicConfigs((Set)Mockito.any());
        ((MirrorSourceConnector)Mockito.doNothing().when((Object)connector)).deprecatedAlterConfigs((Map)Mockito.any());
        connector.syncTopicConfigs();
        Map<String, Config> topicConfigs = Collections.singletonMap("source.testtopic", config);
        ((MirrorSourceConnector)Mockito.verify((Object)connector)).deprecatedAlterConfigs(topicConfigs);
        ((MirrorSourceConnector)Mockito.verify((Object)connector, (VerificationMode)Mockito.never())).incrementalAlterConfigs((Map)Mockito.any());
    }

    @Test
    public void testMirrorSourceConnectorTaskConfig() {
        ArrayList<TopicPartition> knownSourceTopicPartitions = new ArrayList<TopicPartition>();
        knownSourceTopicPartitions.add(new TopicPartition("t0", 0));
        knownSourceTopicPartitions.add(new TopicPartition("t0", 1));
        knownSourceTopicPartitions.add(new TopicPartition("t0", 2));
        knownSourceTopicPartitions.add(new TopicPartition("t0", 3));
        knownSourceTopicPartitions.add(new TopicPartition("t0", 4));
        knownSourceTopicPartitions.add(new TopicPartition("t0", 5));
        knownSourceTopicPartitions.add(new TopicPartition("t0", 6));
        knownSourceTopicPartitions.add(new TopicPartition("t0", 7));
        knownSourceTopicPartitions.add(new TopicPartition("t1", 0));
        knownSourceTopicPartitions.add(new TopicPartition("t1", 1));
        knownSourceTopicPartitions.add(new TopicPartition("t2", 0));
        knownSourceTopicPartitions.add(new TopicPartition("t2", 1));
        MirrorSourceConfig config = new MirrorSourceConfig(TestUtils.makeProps(new String[0]));
        MirrorSourceConnector connector = new MirrorSourceConnector(knownSourceTopicPartitions, config);
        List output = connector.taskConfigs(3);
        Map t1 = (Map)output.get(0);
        Assertions.assertEquals((Object)"t0-0,t0-3,t0-6,t1-1", t1.get("task.assigned.partitions"), (String)"Config for t1 is incorrect");
        Map t2 = (Map)output.get(1);
        Assertions.assertEquals((Object)"t0-1,t0-4,t0-7,t2-0", t2.get("task.assigned.partitions"), (String)"Config for t2 is incorrect");
        Map t3 = (Map)output.get(2);
        Assertions.assertEquals((Object)"t0-2,t0-5,t1-0,t2-1", t3.get("task.assigned.partitions"), (String)"Config for t3 is incorrect");
    }

    @Test
    public void testRefreshTopicPartitions() throws Exception {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), (TopicFilter)new DefaultTopicFilter(), (ConfigPropertyFilter)new DefaultConfigPropertyFilter());
        connector.initialize((ConnectorContext)Mockito.mock(ConnectorContext.class));
        connector = (MirrorSourceConnector)Mockito.spy((Object)connector);
        Config topicConfig = new Config(Arrays.asList(new ConfigEntry("cleanup.policy", "compact"), new ConfigEntry("segment.bytes", "100")));
        Map<String, Config> configs = Collections.singletonMap("topic", topicConfig);
        List<TopicPartition> sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
        ((MirrorSourceConnector)Mockito.doReturn(sourceTopicPartitions).when((Object)connector)).findSourceTopicPartitions();
        ((MirrorSourceConnector)Mockito.doReturn(Collections.emptyList()).when((Object)connector)).findTargetTopicPartitions();
        ((MirrorSourceConnector)Mockito.doReturn(configs).when((Object)connector)).describeTopicConfigs(Collections.singleton("topic"));
        ((MirrorSourceConnector)Mockito.doNothing().when((Object)connector)).createNewTopics((Map)Mockito.any());
        connector.refreshTopicPartitions();
        connector.refreshTopicPartitions();
        HashMap<String, Long> expectedPartitionCounts = new HashMap<String, Long>();
        expectedPartitionCounts.put("source.topic", 1L);
        Map configMap = MirrorSourceConnector.configToMap((Config)topicConfig);
        Assertions.assertEquals((int)2, (int)configMap.size(), (String)"configMap has incorrect size");
        HashMap<String, NewTopic> expectedNewTopics = new HashMap<String, NewTopic>();
        expectedNewTopics.put("source.topic", new NewTopic("source.topic", 1, 0).configs(configMap));
        ((MirrorSourceConnector)Mockito.verify((Object)connector, (VerificationMode)Mockito.times((int)2))).computeAndCreateTopicPartitions();
        ((MirrorSourceConnector)Mockito.verify((Object)connector, (VerificationMode)Mockito.times((int)2))).createNewTopics((Map)Mockito.eq(expectedNewTopics));
        ((MirrorSourceConnector)Mockito.verify((Object)connector, (VerificationMode)Mockito.times((int)0))).createNewPartitions((Map)Mockito.any());
        List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
        ((MirrorSourceConnector)Mockito.doReturn(targetTopicPartitions).when((Object)connector)).findTargetTopicPartitions();
        connector.refreshTopicPartitions();
        ((MirrorSourceConnector)Mockito.verify((Object)connector, (VerificationMode)Mockito.times((int)2))).computeAndCreateTopicPartitions();
    }

    @Test
    public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception {
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new DefaultReplicationPolicy(), (TopicFilter)new DefaultTopicFilter(), (ConfigPropertyFilter)new DefaultConfigPropertyFilter());
        connector.initialize((ConnectorContext)Mockito.mock(ConnectorContext.class));
        connector = (MirrorSourceConnector)Mockito.spy((Object)connector);
        Config topicConfig = new Config(Arrays.asList(new ConfigEntry("cleanup.policy", "compact"), new ConfigEntry("segment.bytes", "100")));
        Map<String, Config> configs = Collections.singletonMap("source.topic", topicConfig);
        List<Object> sourceTopicPartitions = Collections.emptyList();
        List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
        ((MirrorSourceConnector)Mockito.doReturn(sourceTopicPartitions).when((Object)connector)).findSourceTopicPartitions();
        ((MirrorSourceConnector)Mockito.doReturn(targetTopicPartitions).when((Object)connector)).findTargetTopicPartitions();
        ((MirrorSourceConnector)Mockito.doReturn(configs).when((Object)connector)).describeTopicConfigs(Collections.singleton("source.topic"));
        ((MirrorSourceConnector)Mockito.doReturn(Collections.emptyMap()).when((Object)connector)).describeTopicConfigs(Collections.emptySet());
        ((MirrorSourceConnector)Mockito.doNothing().when((Object)connector)).createNewTopics((Map)Mockito.any());
        ((MirrorSourceConnector)Mockito.doNothing().when((Object)connector)).createNewPartitions((Map)Mockito.any());
        connector.refreshTopicPartitions();
        connector.refreshTopicPartitions();
        ((MirrorSourceConnector)Mockito.verify((Object)connector, (VerificationMode)Mockito.times((int)0))).computeAndCreateTopicPartitions();
        sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
        ((MirrorSourceConnector)Mockito.doReturn(sourceTopicPartitions).when((Object)connector)).findSourceTopicPartitions();
        connector.refreshTopicPartitions();
        ((MirrorSourceConnector)Mockito.verify((Object)connector, (VerificationMode)Mockito.times((int)1))).computeAndCreateTopicPartitions();
    }

    @Test
    public void testIsCycleWithNullUpstreamTopic() {
        class CustomReplicationPolicy
        extends DefaultReplicationPolicy {
            CustomReplicationPolicy() {
            }

            public String upstreamTopic(String topic) {
                return null;
            }
        }
        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), (ReplicationPolicy)new CustomReplicationPolicy(), (TopicFilter)new DefaultTopicFilter(), (ConfigPropertyFilter)new DefaultConfigPropertyFilter());
        Assertions.assertDoesNotThrow(() -> connector.isCycle(".b"));
    }

    @Test
    public void testExactlyOnceSupport() {
        String readCommitted = "read_committed";
        String readUncommitted = "read_uncommitted";
        String readGarbage = "read_garbage";
        this.assertExactlyOnceSupport(null, null, false);
        this.assertExactlyOnceSupport(readUncommitted, null, false);
        this.assertExactlyOnceSupport(null, readUncommitted, false);
        this.assertExactlyOnceSupport(readUncommitted, readUncommitted, false);
        this.assertExactlyOnceSupport(readCommitted, null, true);
        this.assertExactlyOnceSupport(null, readCommitted, true);
        this.assertExactlyOnceSupport(readUncommitted, readCommitted, true);
        this.assertExactlyOnceSupport(readCommitted, readCommitted, true);
        this.assertExactlyOnceSupport(readGarbage, null, false);
        this.assertExactlyOnceSupport(null, readGarbage, false);
        this.assertExactlyOnceSupport(readGarbage, readGarbage, false);
        this.assertExactlyOnceSupport(readCommitted, readGarbage, false);
        this.assertExactlyOnceSupport(readUncommitted, readGarbage, false);
        this.assertExactlyOnceSupport(readGarbage, readUncommitted, false);
        this.assertExactlyOnceSupport(readGarbage, readCommitted, true);
    }

    private void assertExactlyOnceSupport(String defaultIsolationLevel, String sourceIsolationLevel, boolean expected) {
        Map<String, String> props = TestUtils.makeProps(new String[0]);
        if (defaultIsolationLevel != null) {
            props.put("consumer.isolation.level", defaultIsolationLevel);
        }
        if (sourceIsolationLevel != null) {
            props.put("source.consumer.isolation.level", sourceIsolationLevel);
        }
        ExactlyOnceSupport expectedSupport = expected ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED;
        ExactlyOnceSupport actualSupport = new MirrorSourceConnector().exactlyOnceSupport(props);
        Assertions.assertEquals((Object)expectedSupport, (Object)actualSupport);
    }

    @Test
    public void testExactlyOnceSupportValidation() {
        String exactlyOnceSupport = "exactly.once.support";
        Map<String, String> props = TestUtils.makeProps(new String[0]);
        Optional<ConfigValue> configValue = this.validateProperty(exactlyOnceSupport, props);
        Assertions.assertEquals(Optional.empty(), configValue);
        props.put(exactlyOnceSupport, "requested");
        configValue = this.validateProperty(exactlyOnceSupport, props);
        Assertions.assertEquals(Optional.empty(), configValue);
        props.put(exactlyOnceSupport, "garbage");
        configValue = this.validateProperty(exactlyOnceSupport, props);
        Assertions.assertEquals(Optional.empty(), configValue);
        props.put(exactlyOnceSupport, "required");
        configValue = this.validateProperty(exactlyOnceSupport, props);
        Assertions.assertTrue((boolean)configValue.isPresent());
        List errorMessages = configValue.get().errorMessages();
        Assertions.assertEquals((int)1, (int)errorMessages.size());
        String errorMessage = (String)errorMessages.get(0);
        Assertions.assertTrue((boolean)((String)errorMessages.get(0)).contains("isolation.level"), (String)("Error message \"" + errorMessage + "\" should have mentioned the 'isolation.level' consumer property"));
        props.put("consumer.isolation.level", "read_committed");
        configValue = this.validateProperty(exactlyOnceSupport, props);
        Assertions.assertEquals(Optional.empty(), configValue);
        props.put("offset.lag.max", "bad");
        configValue = this.validateProperty("offset.lag.max", props);
        Assertions.assertTrue((boolean)configValue.isPresent());
        errorMessages = configValue.get().errorMessages();
        Assertions.assertEquals((int)1, (int)errorMessages.size());
        errorMessage = (String)errorMessages.get(0);
        Assertions.assertTrue((boolean)((String)errorMessages.get(0)).contains("offset.lag.max"), (String)("Error message \"" + errorMessage + "\" should have mentioned the 'offset.lag.max' property"));
        configValue = this.validateProperty(exactlyOnceSupport, props);
        Assertions.assertEquals(Optional.empty(), configValue);
        props.remove("consumer.isolation.level");
        configValue = this.validateProperty(exactlyOnceSupport, props);
        Assertions.assertTrue((boolean)configValue.isPresent());
        errorMessages = configValue.get().errorMessages();
        Assertions.assertEquals((int)1, (int)errorMessages.size());
        errorMessage = (String)errorMessages.get(0);
        Assertions.assertTrue((boolean)((String)errorMessages.get(0)).contains("isolation.level"), (String)("Error message \"" + errorMessage + "\" should have mentioned the 'isolation.level' consumer property"));
    }

    private Optional<ConfigValue> validateProperty(String name, Map<String, String> props) {
        List results = new MirrorSourceConnector().validate(props).configValues().stream().filter(cv -> name.equals(cv.name())).collect(Collectors.toList());
        Assertions.assertTrue((results.size() <= 1 ? 1 : 0) != 0, (String)("Connector produced multiple config values for '" + name + "' property"));
        if (results.isEmpty()) {
            return Optional.empty();
        }
        ConfigValue result = (ConfigValue)results.get(0);
        Assertions.assertNotNull((Object)result, (String)("Connector should not have record null config value for '" + name + "' property"));
        return Optional.of(result);
    }

    @Test
    public void testAlterOffsetsIncorrectPartitionKey() {
        MirrorSourceConnector connector = new MirrorSourceConnector();
        Assertions.assertThrows(ConnectException.class, () -> connector.alterOffsets(null, Collections.singletonMap(Collections.singletonMap("unused_partition_key", "unused_partition_value"), MirrorUtils.wrapOffset((long)10L))));
        Assertions.assertThrows(ConnectException.class, () -> connector.alterOffsets(null, Collections.singletonMap(null, MirrorUtils.wrapOffset((long)10L))));
    }

    @Test
    public void testAlterOffsetsMissingPartitionKey() {
        MirrorSourceConnector connector = new MirrorSourceConnector();
        Function<Map, Boolean> alterOffsets = partition -> connector.alterOffsets(null, Collections.singletonMap(partition, MirrorUtils.wrapOffset((long)64L)));
        Map<String, Object> validPartition = MirrorSourceConnectorTest.sourcePartition("t", 3, "us-east-2");
        Assertions.assertTrue((boolean)alterOffsets.apply(validPartition));
        for (String key : Arrays.asList("cluster", "topic", "partition")) {
            HashMap<String, Object> invalidPartition = new HashMap<String, Object>(validPartition);
            invalidPartition.remove(key);
            Assertions.assertThrows(ConnectException.class, () -> alterOffsets.apply(invalidPartition));
        }
    }

    @Test
    public void testAlterOffsetsInvalidPartitionPartition() {
        MirrorSourceConnector connector = new MirrorSourceConnector();
        Map<String, Object> partition = MirrorSourceConnectorTest.sourcePartition("t", 3, "us-west-2");
        partition.put("partition", "a string");
        Assertions.assertThrows(ConnectException.class, () -> connector.alterOffsets(null, Collections.singletonMap(partition, MirrorUtils.wrapOffset((long)49L))));
    }

    @Test
    public void testAlterOffsetsMultiplePartitions() {
        MirrorSourceConnector connector = new MirrorSourceConnector();
        Map<String, Object> partition1 = MirrorSourceConnectorTest.sourcePartition("t1", 0, "primary");
        Map<String, Object> partition2 = MirrorSourceConnectorTest.sourcePartition("t1", 1, "primary");
        HashMap<Map<String, Object>, Map> offsets = new HashMap<Map<String, Object>, Map>();
        offsets.put(partition1, MirrorUtils.wrapOffset((long)50L));
        offsets.put(partition2, MirrorUtils.wrapOffset((long)100L));
        Assertions.assertTrue((boolean)connector.alterOffsets(null, offsets));
    }

    @Test
    public void testAlterOffsetsIncorrectOffsetKey() {
        MirrorSourceConnector connector = new MirrorSourceConnector();
        Map<Map<String, Object>, Map<String, Integer>> offsets = Collections.singletonMap(MirrorSourceConnectorTest.sourcePartition("t1", 2, "backup"), Collections.singletonMap("unused_offset_key", 0));
        Assertions.assertThrows(ConnectException.class, () -> connector.alterOffsets(null, offsets));
    }

    @Test
    public void testAlterOffsetsOffsetValues() {
        MirrorSourceConnector connector = new MirrorSourceConnector();
        Function<Object, Boolean> alterOffsets = offset -> connector.alterOffsets(null, Collections.singletonMap(MirrorSourceConnectorTest.sourcePartition("t", 5, "backup"), Collections.singletonMap("offset", offset)));
        Assertions.assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
        Assertions.assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
        Assertions.assertThrows(ConnectException.class, () -> alterOffsets.apply(new Object()));
        Assertions.assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
        Assertions.assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
        Assertions.assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
        Assertions.assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(0));
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(10));
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(0x80000000L));
    }

    @Test
    public void testSuccessfulAlterOffsets() {
        MirrorSourceConnector connector = new MirrorSourceConnector();
        Map<Map<String, Object>, Map> offsets = Collections.singletonMap(MirrorSourceConnectorTest.sourcePartition("t2", 0, "backup"), MirrorUtils.wrapOffset((long)5L));
        Assertions.assertTrue((boolean)connector.alterOffsets(null, offsets));
        Assertions.assertTrue((boolean)connector.alterOffsets(null, Collections.emptyMap()));
    }

    @Test
    public void testAlterOffsetsTombstones() {
        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
        Function<Map, Boolean> alterOffsets = partition -> connector.alterOffsets(null, Collections.singletonMap(partition, null));
        Map<String, Object> partition2 = MirrorSourceConnectorTest.sourcePartition("kips", 875, "apache.kafka");
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(partition2));
        partition2.put("partition", "a string");
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(partition2));
        partition2.remove("partition");
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(partition2));
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(null));
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(Collections.emptyMap()));
        Assertions.assertTrue(() -> (Boolean)alterOffsets.apply(Collections.singletonMap("unused_partition_key", "unused_partition_value")));
    }

    private static Map<String, Object> sourcePartition(String topic, int partition, String sourceClusterAlias) {
        return MirrorUtils.wrapPartition((TopicPartition)new TopicPartition(topic, partition), (String)sourceClusterAlias);
    }
}

