/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror.integration;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag(value="integration")
public class MirrorConnectorsIntegrationExactlyOnceTest
extends MirrorConnectorsIntegrationBaseTest {
    @Override
    @BeforeEach
    public void startClusters() throws Exception {
        this.mm2Props.put("primary.exactly.once.source.support", DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString());
        this.mm2Props.put("backup.exactly.once.source.support", DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString());
        for (Properties brokerProps : Arrays.asList(this.primaryBrokerProps, this.backupBrokerProps)) {
            brokerProps.put("transaction.state.log.replication.factor", "1");
            brokerProps.put("transaction.state.log.min.isr", "1");
        }
        super.startClusters();
    }

    @Override
    @Test
    public void testReplication() throws Exception {
        super.testReplication();
        String backupTopic1 = this.remoteTopicName("test-topic-1", "primary");
        String backupTopic2 = this.remoteTopicName("test-topic-2", "primary");
        MirrorConnectorsIntegrationExactlyOnceTest.stopMirrorMakerConnectors(this.backup, MirrorSourceConnector.class);
        MirrorConnectorsIntegrationExactlyOnceTest.alterMirrorMakerSourceConnectorOffsets(this.backup, n -> 0L, "test-topic-1");
        MirrorConnectorsIntegrationExactlyOnceTest.resetSomeMirrorMakerSourceConnectorOffsets(this.backup, "test-topic-2");
        MirrorConnectorsIntegrationExactlyOnceTest.resumeMirrorMakerConnectors(this.backup, MirrorSourceConnector.class);
        int expectedRecordsTopic1 = 190;
        Assertions.assertEquals((int)expectedRecordsTopic1, (int)this.backup.kafka().consume(expectedRecordsTopic1, 30000L, new String[]{backupTopic1}).count(), (String)"Records were not re-replicated to backup cluster after altering offsets.");
        int expectedRecordsTopic2 = 20;
        Assertions.assertEquals((int)expectedRecordsTopic2, (int)this.backup.kafka().consume(expectedRecordsTopic2, 30000L, new String[]{backupTopic2}).count(), (String)"New topic was not re-replicated to backup cluster after altering offsets.");
        Class[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]);
        MirrorConnectorsIntegrationExactlyOnceTest.stopMirrorMakerConnectors(this.backup, connectorsToReset);
        MirrorConnectorsIntegrationExactlyOnceTest.resetAllMirrorMakerConnectorOffsets(this.backup, connectorsToReset);
        MirrorConnectorsIntegrationExactlyOnceTest.resumeMirrorMakerConnectors(this.backup, connectorsToReset);
        Assertions.assertEquals((int)(expectedRecordsTopic1 += 100), (int)this.backup.kafka().consume(expectedRecordsTopic1, 30000L, new String[]{backupTopic1}).count(), (String)"Records were not re-replicated to backup cluster after resetting offsets.");
        Assertions.assertEquals((int)(expectedRecordsTopic2 += 10), (int)this.backup.kafka().consume(expectedRecordsTopic2, 30000L, new String[]{backupTopic2}).count(), (String)"New topic was not re-replicated to backup cluster after resetting offsets.");
    }
}

