/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry;

import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.RestApp;
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class MasterElectorTest
extends ClusterTestHarness {
    @Parameterized.Parameter(value=0)
    public String electorType;
    @Parameterized.Parameter(value=1)
    public int reservationBatchSize;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"kafka", 0}, {"zookeeper", 20});
    }

    private String zkConnect() {
        switch (this.electorType) {
            case "zookeeper": {
                return this.zkConnect;
            }
            case "kafka": {
                return null;
            }
        }
        throw new IllegalArgumentException();
    }

    private String bootstrapServers() {
        switch (this.electorType) {
            case "zookeeper": {
                return null;
            }
            case "kafka": {
                return this.bootstrapServers;
            }
        }
        throw new IllegalArgumentException();
    }

    @Test
    public void testAutoFailover() throws Exception {
        String subject = "testTopic";
        String configSubject = "configTopic";
        List<String> avroSchemas = TestUtils.getRandomCanonicalAvroString(4);
        int port1 = MasterElectorTest.choosePort();
        int port2 = MasterElectorTest.choosePort();
        if (port2 < port1) {
            int tmp = port2;
            port2 = port1;
            port1 = tmp;
        }
        RestApp restApp1 = new RestApp(port1, this.zkConnect(), this.bootstrapServers(), "", AvroCompatibilityLevel.NONE.name, true, null);
        restApp1.start();
        final RestApp restApp2 = new RestApp(port2, this.zkConnect(), this.bootstrapServers(), "", AvroCompatibilityLevel.NONE.name, true, null);
        restApp2.start();
        Assert.assertTrue((String)"Schema registry instance 1 should be the master", (boolean)restApp1.isMaster());
        Assert.assertFalse((String)"Schema registry instance 2 shouldn't be the master", (boolean)restApp2.isMaster());
        Assert.assertEquals((String)"Instance 2's master should be instance 1", (Object)restApp1.myIdentity(), (Object)restApp2.masterIdentity());
        String firstSchema = avroSchemas.get(0);
        boolean firstSchemaExpectedId = true;
        TestUtils.registerAndVerifySchema(restApp1.restClient, firstSchema, 1, "testTopic");
        this.verifyIdAndSchema(restApp2.restClient, 1, firstSchema, "Registered schema should be found on the non-master");
        String secondSchema = avroSchemas.get(1);
        int secondSchemaExpectedId = 2;
        int secondSchemaExpectedVersion = 2;
        Assert.assertEquals((String)"Registering a new schema to the non-master should succeed", (long)2L, (long)restApp2.restClient.registerSchema(secondSchema, "testTopic"));
        Assert.assertEquals((String)"Registered schema should be found on the master", (Object)secondSchema, (Object)restApp1.restClient.getId(2).getSchemaString());
        Assert.assertEquals((String)"Registered schema should be found on the master", (Object)secondSchema, (Object)restApp1.restClient.getVersion("testTopic", 2).getSchema());
        this.verifyIdAndSchema(restApp2.restClient, 2, secondSchema, "Registered schema should be found on the non-master");
        Assert.assertEquals((String)"Registering an existing schema to the master should return its id", (long)2L, (long)restApp1.restClient.registerSchema(secondSchema, "testTopic"));
        Assert.assertEquals((String)"Registering an existing schema to the non-master should return its id", (long)2L, (long)restApp2.restClient.registerSchema(secondSchema, "testTopic"));
        restApp1.restClient.updateCompatibility(AvroCompatibilityLevel.FORWARD.name, "configTopic");
        Assert.assertEquals((String)"New compatibility level should be FORWARD on the master", (Object)AvroCompatibilityLevel.FORWARD.name, (Object)restApp1.restClient.getConfig("configTopic").getCompatibilityLevel());
        this.waitUntilCompatibilityLevelSet(restApp2.restClient, "configTopic", AvroCompatibilityLevel.FORWARD.name, "New compatibility level should be FORWARD on the non-master");
        restApp2.restClient.updateCompatibility(AvroCompatibilityLevel.NONE.name, "configTopic");
        Assert.assertEquals((String)"New compatibility level should be NONE on the master", (Object)AvroCompatibilityLevel.NONE.name, (Object)restApp1.restClient.getConfig("configTopic").getCompatibilityLevel());
        this.waitUntilCompatibilityLevelSet(restApp2.restClient, "configTopic", AvroCompatibilityLevel.NONE.name, "New compatibility level should be NONE on the non-master");
        restApp1.setMaster(null);
        int statusCodeFromRestApp1 = 0;
        String failedSchema = "{\"type\":\"string\"}";
        try {
            restApp1.restClient.registerSchema("{\"type\":\"string\"}", "testTopic");
            Assert.fail((String)"Registration should fail on the master");
        }
        catch (RestClientException e) {
            statusCodeFromRestApp1 = e.getStatus();
        }
        int statusCodeFromRestApp2 = 0;
        try {
            restApp2.restClient.registerSchema("{\"type\":\"string\"}", "testTopic");
            Assert.fail((String)"Registration should fail on the non-master");
        }
        catch (RestClientException e) {
            statusCodeFromRestApp2 = e.getStatus();
        }
        Assert.assertEquals((String)"Status code from a non-master rest app for register schema should be 500", (long)500L, (long)statusCodeFromRestApp1);
        Assert.assertEquals((String)"Error code from the master and the non-master should be the same", (long)statusCodeFromRestApp1, (long)statusCodeFromRestApp2);
        int updateConfigStatusCodeFromRestApp1 = 0;
        try {
            restApp1.restClient.updateCompatibility(AvroCompatibilityLevel.FORWARD.name, "configTopic");
            Assert.fail((String)"Update config should fail on the master");
        }
        catch (RestClientException e) {
            updateConfigStatusCodeFromRestApp1 = e.getStatus();
        }
        int updateConfigStatusCodeFromRestApp2 = 0;
        try {
            restApp2.restClient.updateCompatibility(AvroCompatibilityLevel.FORWARD.name, "configTopic");
            Assert.fail((String)"Update config should fail on the non-master");
        }
        catch (RestClientException e) {
            updateConfigStatusCodeFromRestApp2 = e.getStatus();
        }
        Assert.assertEquals((String)"Status code from a non-master rest app for update config should be 500", (long)500L, (long)updateConfigStatusCodeFromRestApp1);
        Assert.assertEquals((String)"Error code from the master and the non-master should be the same", (long)updateConfigStatusCodeFromRestApp1, (long)updateConfigStatusCodeFromRestApp2);
        Assert.assertEquals((String)"Registering an existing schema to the non-master should return its id", (long)2L, (long)restApp2.restClient.registerSchema(secondSchema, "testTopic"));
        restApp1.setMaster(restApp1.myIdentity());
        String thirdSchema = avroSchemas.get(2);
        int thirdSchemaExpectedVersion = 3;
        int thirdSchemaExpectedId = this.electorType == "zookeeper" ? this.reservationBatchSize + 1 : 3;
        Assert.assertEquals((String)"Registering a new schema to the master should succeed", (long)thirdSchemaExpectedId, (long)restApp1.restClient.registerSchema(thirdSchema, "testTopic"));
        restApp1.stop();
        Callable<Boolean> condition = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return restApp2.isMaster();
            }
        };
        TestUtils.waitUntilTrue(condition, 15000L, "Schema registry instance 2 should become the master");
        Assert.assertEquals((String)"Latest version should be found on the new master", (Object)thirdSchema, (Object)restApp2.restClient.getId(thirdSchemaExpectedId).getSchemaString());
        Assert.assertEquals((String)"Latest version should be found on the new master", (Object)thirdSchema, (Object)restApp2.restClient.getVersion("testTopic", 3).getSchema());
        String fourthSchema = avroSchemas.get(3);
        int fourthSchemaExpectedId = this.electorType == "zookeeper" ? 2 * this.reservationBatchSize + 1 : thirdSchemaExpectedId + 1;
        TestUtils.registerAndVerifySchema(restApp2.restClient, fourthSchema, fourthSchemaExpectedId, "testTopic");
        restApp2.stop();
    }

    @Test
    public void testSlaveIsNeverMaster() throws Exception {
        int numSlaves = 2;
        int numMasters = 30;
        HashSet<RestApp> slaveApps = new HashSet<RestApp>();
        RestApp aSlave = null;
        for (int i = 0; i < numSlaves; ++i) {
            RestApp slave = new RestApp(MasterElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", AvroCompatibilityLevel.NONE.name, false, null);
            slaveApps.add(slave);
            slave.start();
            aSlave = slave;
        }
        Assert.assertNotNull(aSlave);
        for (RestApp slave : slaveApps) {
            Assert.assertFalse((String)"No slave should be master.", (boolean)slave.isMaster());
            Assert.assertNull((String)"No master should be present in a slave cluster.", (Object)slave.masterIdentity());
        }
        try {
            aSlave.setMaster(aSlave.myIdentity());
        }
        catch (IllegalStateException i) {
            // empty catch block
        }
        Assert.assertFalse((String)"Should not be able to set a slave to be master.", (boolean)aSlave.isMaster());
        Assert.assertNull((String)"There should be no master present.", (Object)aSlave.masterIdentity());
        HashSet<RestApp> masterApps = new HashSet<RestApp>();
        for (int i = 0; i < numMasters; ++i) {
            RestApp master = new RestApp(MasterElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", AvroCompatibilityLevel.NONE.name, true, null);
            masterApps.add(master);
            master.start();
            this.waitUntilMasterElectionCompletes(masterApps);
        }
        while (masterApps.size() > 0) {
            RestApp reportedMaster = MasterElectorTest.checkOneMaster(masterApps);
            masterApps.remove(reportedMaster);
            MasterElectorTest.checkMasterIdentity(slaveApps, reportedMaster.myIdentity());
            MasterElectorTest.checkMasterIdentity(masterApps, reportedMaster.myIdentity());
            MasterElectorTest.checkNoneIsMaster(slaveApps);
            reportedMaster.stop();
            this.waitUntilMasterElectionCompletes(masterApps);
        }
        MasterElectorTest.checkNoneIsMaster(slaveApps);
        MasterElectorTest.checkNoneIsMaster(masterApps);
        for (RestApp slave : slaveApps) {
            slave.stop();
        }
    }

    @Test
    public void testRegistrationOnMasterSlaveClusters() throws Exception {
        int id;
        Object master;
        int numSlaves = 4;
        int numMasters = 4;
        int numSchemas = 5;
        String subject = "testSubject";
        List<String> schemas = TestUtils.getRandomCanonicalAvroString(numSchemas);
        ArrayList<Integer> ids = new ArrayList<Integer>();
        HashSet<RestApp> slaveApps = new HashSet<RestApp>();
        RestApp aSlave = null;
        for (int i = 0; i < numSlaves; ++i) {
            RestApp slave = new RestApp(MasterElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", AvroCompatibilityLevel.NONE.name, false, null);
            slaveApps.add(slave);
            slave.start();
            aSlave = slave;
        }
        Assert.assertNotNull(aSlave);
        boolean successfullyRegistered = false;
        try {
            aSlave.restClient.registerSchema(schemas.get(0), subject);
            successfullyRegistered = true;
        }
        catch (RestClientException slave) {
            // empty catch block
        }
        Assert.assertFalse((String)"Should not be possible to register with no masters present.", (boolean)successfullyRegistered);
        HashSet<RestApp> masterApps = new HashSet<RestApp>();
        RestApp aMaster = null;
        for (int i = 0; i < numMasters; ++i) {
            master = new RestApp(MasterElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", AvroCompatibilityLevel.NONE.name, true, null);
            masterApps.add((RestApp)master);
            ((RestApp)master).start();
            aMaster = master;
        }
        Assert.assertNotNull(aMaster);
        try {
            for (String schema : schemas) {
                ids.add(aMaster.restClient.registerSchema(schema, subject));
            }
        }
        catch (RestClientException e) {
            Assert.fail((String)"It should be possible to register schemas when a master cluster is present.");
        }
        String anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        try {
            ids.add(aSlave.restClient.registerSchema(anotherSchema, subject));
        }
        catch (RestClientException e) {
            Assert.fail((String)"Should be possible register a schema through slave cluster.");
        }
        try {
            Iterator e = ids.iterator();
            while (e.hasNext()) {
                id = (Integer)e.next();
                this.waitUntilIdExists(aSlave.restClient, id, String.format("Should be possible to fetch id %d from this slave.", id));
                this.waitUntilIdExists(aMaster.restClient, id, String.format("Should be possible to fetch id %d from this master.", id));
                SchemaString slaveResponse = aSlave.restClient.getId(id);
                SchemaString masterResponse = aMaster.restClient.getId(id);
                Assert.assertEquals((String)"Master and slave responded with different schemas when queried with the same id.", (Object)slaveResponse.getSchemaString(), (Object)masterResponse.getSchemaString());
            }
        }
        catch (RestClientException e) {
            Assert.fail((String)"Expected ids were not found in the schema registry.");
        }
        while (masterApps.size() > 0) {
            master = MasterElectorTest.findMaster(masterApps);
            masterApps.remove(master);
            ((RestApp)master).stop();
            this.waitUntilMasterElectionCompletes(masterApps);
        }
        anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        successfullyRegistered = false;
        try {
            aSlave.restClient.registerSchema(anotherSchema, subject);
            successfullyRegistered = true;
        }
        catch (RestClientException master2) {
            // empty catch block
        }
        Assert.assertFalse((String)"Should not be possible to register with no masters present.", (boolean)successfullyRegistered);
        try {
            master = ids.iterator();
            while (master.hasNext()) {
                id = (Integer)master.next();
                SchemaString schemaString = aSlave.restClient.getId(id);
            }
            List versions = aSlave.restClient.getAllVersions(subject);
            Assert.assertEquals((String)"Number of ids should match number of versions.", (long)ids.size(), (long)versions.size());
        }
        catch (RestClientException e) {
            Assert.fail((String)"Should be possible to fetch registered schemas even with no masters present.");
        }
        for (RestApp slave : slaveApps) {
            slave.stop();
        }
    }

    private static RestApp findMaster(Collection<RestApp> cluster) {
        for (RestApp restApp : cluster) {
            if (!restApp.isMaster()) continue;
            return restApp;
        }
        return null;
    }

    private static void checkNoneIsMaster(Collection<RestApp> cluster) {
        Assert.assertNull((String)"Expected none of the nodes in this cluster to report itself as master.", (Object)MasterElectorTest.findMaster(cluster));
    }

    private static void checkMasterIdentity(Collection<RestApp> cluster, SchemaRegistryIdentity expectedMasterIdentity) {
        for (RestApp restApp : cluster) {
            for (int i = 0; i < 3; ++i) {
                SchemaRegistryIdentity masterIdentity = restApp.masterIdentity();
                if (masterIdentity == null) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException interruptedException) {}
                    continue;
                }
                Assert.assertEquals((String)("Each master identity should be " + expectedMasterIdentity), (Object)expectedMasterIdentity, (Object)masterIdentity);
            }
        }
    }

    private static Set<SchemaRegistryIdentity> getMasterIdentities(Collection<RestApp> cluster) {
        HashSet<SchemaRegistryIdentity> masterIdentities = new HashSet<SchemaRegistryIdentity>();
        for (RestApp app : cluster) {
            if (app == null || app.masterIdentity() == null) continue;
            masterIdentities.add(app.masterIdentity());
        }
        return masterIdentities;
    }

    private static RestApp checkOneMaster(Collection<RestApp> cluster) {
        int masterCount = 0;
        RestApp master = null;
        for (RestApp restApp : cluster) {
            if (!restApp.isMaster()) continue;
            ++masterCount;
            master = restApp;
        }
        Assert.assertEquals((String)("Expected one master but found " + masterCount), (long)1L, (long)masterCount);
        return master;
    }

    private void waitUntilMasterElectionCompletes(final Collection<RestApp> cluster) {
        if (cluster == null || cluster.size() == 0) {
            return;
        }
        Callable<Boolean> newMasterElected = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                boolean hasMaster = MasterElectorTest.findMaster(cluster) != null;
                boolean oneReportedMaster = MasterElectorTest.getMasterIdentities(cluster).size() == 1;
                return hasMaster && oneReportedMaster;
            }
        };
        TestUtils.waitUntilTrue(newMasterElected, 15000L, "A node should have been elected master by now.");
    }

    private void waitUntilIdExists(final RestService restService, final int expectedId, String errorMsg) {
        Callable<Boolean> canGetSchemaById = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                try {
                    restService.getId(expectedId);
                    return true;
                }
                catch (RestClientException e) {
                    return false;
                }
            }
        };
        TestUtils.waitUntilTrue(canGetSchemaById, 15000L, errorMsg);
    }

    private void waitUntilCompatibilityLevelSet(final RestService restService, final String subject, final String expectedCompatibilityLevel, String errorMsg) {
        Callable<Boolean> canGetSchemaById = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                try {
                    String actualCompatibilityLevel = restService.getConfig(subject).getCompatibilityLevel();
                    return expectedCompatibilityLevel.compareTo(actualCompatibilityLevel) == 0;
                }
                catch (RestClientException e) {
                    return false;
                }
            }
        };
        TestUtils.waitUntilTrue(canGetSchemaById, 15000L, errorMsg);
    }

    private void verifyIdAndSchema(RestService restService, int expectedId, String expectedSchemaString, String errMsg) {
        this.waitUntilIdExists(restService, expectedId, errMsg);
        String schemaString = null;
        try {
            schemaString = restService.getId(expectedId).getSchemaString();
        }
        catch (IOException e) {
            Assert.fail((String)errMsg);
        }
        catch (RestClientException e) {
            Assert.fail((String)errMsg);
        }
        Assert.assertEquals((String)errMsg, (Object)expectedSchemaString, (Object)schemaString);
    }
}

