/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry;

import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.storage.Mode;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class LeaderElectorTest
extends ClusterTestHarness {
    @Parameterized.Parameter(value=0)
    public String electorType;
    @Parameterized.Parameter(value=1)
    public int reservationBatchSize;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"kafka", 0}, {"zookeeper", 20});
    }

    private String zkConnect() {
        switch (this.electorType) {
            case "zookeeper": {
                return this.zkConnect;
            }
            case "kafka": {
                return null;
            }
        }
        throw new IllegalArgumentException();
    }

    private String bootstrapServers() {
        switch (this.electorType) {
            case "zookeeper": {
                return null;
            }
            case "kafka": {
                return this.bootstrapServers;
            }
        }
        throw new IllegalArgumentException();
    }

    @Test
    public void testAutoFailover() throws Exception {
        String subject = "testTopic";
        String configSubject = "configTopic";
        List<String> avroSchemas = TestUtils.getRandomCanonicalAvroString(4);
        int port1 = LeaderElectorTest.choosePort();
        int port2 = LeaderElectorTest.choosePort();
        if (port2 < port1) {
            int tmp = port2;
            port2 = port1;
            port1 = tmp;
        }
        RestApp restApp1 = new RestApp(port1, this.zkConnect(), this.bootstrapServers(), "", CompatibilityLevel.NONE.name, true, null);
        restApp1.start();
        final RestApp restApp2 = new RestApp(port2, this.zkConnect(), this.bootstrapServers(), "", CompatibilityLevel.NONE.name, true, null);
        restApp2.start();
        Assert.assertTrue((String)"Schema registry instance 1 should be the leader", (boolean)restApp1.isLeader());
        Assert.assertFalse((String)"Schema registry instance 2 shouldn't be the leader", (boolean)restApp2.isLeader());
        Assert.assertEquals((String)"Instance 2's leader should be instance 1", (Object)restApp1.myIdentity(), (Object)restApp2.leaderIdentity());
        String firstSchema = avroSchemas.get(0);
        boolean firstSchemaExpectedId = true;
        TestUtils.registerAndVerifySchema(restApp1.restClient, firstSchema, 1, "testTopic");
        this.verifyIdAndSchema(restApp2.restClient, 1, firstSchema, "Registered schema should be found on the non-leader");
        String secondSchema = avroSchemas.get(1);
        int secondSchemaExpectedId = 2;
        int secondSchemaExpectedVersion = 2;
        Assert.assertEquals((String)"Registering a new schema to the non-leader should succeed", (long)2L, (long)restApp2.restClient.registerSchema(secondSchema, "testTopic"));
        Assert.assertEquals((String)"Registered schema should be found on the leader", (Object)secondSchema, (Object)restApp1.restClient.getId(2).getSchemaString());
        Assert.assertEquals((String)"Registered schema should be found on the leader", (Object)secondSchema, (Object)restApp1.restClient.getVersion("testTopic", 2).getSchema());
        this.verifyIdAndSchema(restApp2.restClient, 2, secondSchema, "Registered schema should be found on the non-leader");
        Assert.assertEquals((String)"Registering an existing schema to the leader should return its id", (long)2L, (long)restApp1.restClient.registerSchema(secondSchema, "testTopic"));
        Assert.assertEquals((String)"Registering an existing schema to the non-leader should return its id", (long)2L, (long)restApp2.restClient.registerSchema(secondSchema, "testTopic"));
        restApp1.restClient.updateCompatibility(CompatibilityLevel.FORWARD.name, "configTopic");
        Assert.assertEquals((String)"New compatibility level should be FORWARD on the leader", (Object)CompatibilityLevel.FORWARD.name, (Object)restApp1.restClient.getConfig("configTopic").getCompatibilityLevel());
        this.waitUntilCompatibilityLevelSet(restApp2.restClient, "configTopic", CompatibilityLevel.FORWARD.name, "New compatibility level should be FORWARD on the non-leader");
        restApp2.restClient.updateCompatibility(CompatibilityLevel.NONE.name, "configTopic");
        Assert.assertEquals((String)"New compatibility level should be NONE on the leader", (Object)CompatibilityLevel.NONE.name, (Object)restApp1.restClient.getConfig("configTopic").getCompatibilityLevel());
        this.waitUntilCompatibilityLevelSet(restApp2.restClient, "configTopic", CompatibilityLevel.NONE.name, "New compatibility level should be NONE on the non-leader");
        restApp1.setLeader(null);
        int statusCodeFromRestApp1 = 0;
        String failedSchema = "{\"type\":\"string\"}";
        try {
            restApp1.restClient.registerSchema("{\"type\":\"string\"}", "testTopic");
            Assert.fail((String)"Registration should fail on the leader");
        }
        catch (RestClientException e) {
            statusCodeFromRestApp1 = e.getStatus();
        }
        int statusCodeFromRestApp2 = 0;
        try {
            restApp2.restClient.registerSchema("{\"type\":\"string\"}", "testTopic");
            Assert.fail((String)"Registration should fail on the non-leader");
        }
        catch (RestClientException e) {
            statusCodeFromRestApp2 = e.getStatus();
        }
        Assert.assertEquals((String)"Status code from a non-leader rest app for register schema should be 500", (long)500L, (long)statusCodeFromRestApp1);
        Assert.assertEquals((String)"Error code from the leader and the non-leader should be the same", (long)statusCodeFromRestApp1, (long)statusCodeFromRestApp2);
        int updateConfigStatusCodeFromRestApp1 = 0;
        try {
            restApp1.restClient.updateCompatibility(CompatibilityLevel.FORWARD.name, "configTopic");
            Assert.fail((String)"Update config should fail on the leader");
        }
        catch (RestClientException e) {
            updateConfigStatusCodeFromRestApp1 = e.getStatus();
        }
        int updateConfigStatusCodeFromRestApp2 = 0;
        try {
            restApp2.restClient.updateCompatibility(CompatibilityLevel.FORWARD.name, "configTopic");
            Assert.fail((String)"Update config should fail on the non-leader");
        }
        catch (RestClientException e) {
            updateConfigStatusCodeFromRestApp2 = e.getStatus();
        }
        Assert.assertEquals((String)"Status code from a non-leader rest app for update config should be 500", (long)500L, (long)updateConfigStatusCodeFromRestApp1);
        Assert.assertEquals((String)"Error code from the leader and the non-leader should be the same", (long)updateConfigStatusCodeFromRestApp1, (long)updateConfigStatusCodeFromRestApp2);
        Assert.assertEquals((String)"Registering an existing schema to the non-leader should return its id", (long)2L, (long)restApp2.restClient.registerSchema(secondSchema, "testTopic"));
        restApp1.setLeader(restApp1.myIdentity());
        String thirdSchema = avroSchemas.get(2);
        int thirdSchemaExpectedVersion = 3;
        int thirdSchemaExpectedId = this.electorType == "zookeeper" ? this.reservationBatchSize + 1 : 3;
        Assert.assertEquals((String)"Registering a new schema to the leader should succeed", (long)thirdSchemaExpectedId, (long)restApp1.restClient.registerSchema(thirdSchema, "testTopic"));
        restApp1.stop();
        Callable<Boolean> condition = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return restApp2.isLeader();
            }
        };
        TestUtils.waitUntilTrue(condition, 15000L, "Schema registry instance 2 should become the leader");
        Assert.assertEquals((String)"Latest version should be found on the new leader", (Object)thirdSchema, (Object)restApp2.restClient.getId(thirdSchemaExpectedId).getSchemaString());
        Assert.assertEquals((String)"Latest version should be found on the new leader", (Object)thirdSchema, (Object)restApp2.restClient.getVersion("testTopic", 3).getSchema());
        String fourthSchema = avroSchemas.get(3);
        int fourthSchemaExpectedId = this.electorType == "zookeeper" ? 2 * this.reservationBatchSize + 1 : thirdSchemaExpectedId + 1;
        TestUtils.registerAndVerifySchema(restApp2.restClient, fourthSchema, fourthSchemaExpectedId, "testTopic");
        restApp2.stop();
    }

    @Test
    public void testFollowerIsNeverLeader() throws Exception {
        int numFollowers = 2;
        int numLeaders = 30;
        HashSet<RestApp> followerApps = new HashSet<RestApp>();
        RestApp aFollower = null;
        for (int i = 0; i < numFollowers; ++i) {
            RestApp follower = new RestApp(LeaderElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", CompatibilityLevel.NONE.name, false, null);
            followerApps.add(follower);
            follower.start();
            aFollower = follower;
        }
        Assert.assertNotNull(aFollower);
        for (RestApp follower : followerApps) {
            Assert.assertFalse((String)"No follower should be leader.", (boolean)follower.isLeader());
            Assert.assertNull((String)"No follower should be present in a follower cluster.", (Object)follower.leaderIdentity());
        }
        try {
            aFollower.setLeader(aFollower.myIdentity());
        }
        catch (IllegalStateException i) {
            // empty catch block
        }
        Assert.assertFalse((String)"Should not be able to set a follower to be leader.", (boolean)aFollower.isLeader());
        Assert.assertNull((String)"There should be no leader present.", (Object)aFollower.leaderIdentity());
        HashSet<RestApp> leaderApps = new HashSet<RestApp>();
        for (int i = 0; i < numLeaders; ++i) {
            RestApp leader = new RestApp(LeaderElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", CompatibilityLevel.NONE.name, true, null);
            leaderApps.add(leader);
            leader.start();
            TestUtils.waitUntilLeaderElectionCompletes(leaderApps);
        }
        while (leaderApps.size() > 0) {
            RestApp reportedLeader = TestUtils.checkOneLeader(leaderApps);
            leaderApps.remove(reportedLeader);
            LeaderElectorTest.checkLeaderIdentity(followerApps, reportedLeader.myIdentity());
            LeaderElectorTest.checkLeaderIdentity(leaderApps, reportedLeader.myIdentity());
            LeaderElectorTest.checkNoneIsLeader(followerApps);
            reportedLeader.stop();
            TestUtils.waitUntilLeaderElectionCompletes(leaderApps);
        }
        LeaderElectorTest.checkNoneIsLeader(followerApps);
        LeaderElectorTest.checkNoneIsLeader(leaderApps);
        for (RestApp follower : followerApps) {
            follower.stop();
        }
    }

    @Test
    public void testRegistrationOnLeaderFollowerClusters() throws Exception {
        int id;
        Object leader;
        int numFollowers = 4;
        int numLeaders = 4;
        int numSchemas = 5;
        String subject = "testSubject";
        List<String> schemas = TestUtils.getRandomCanonicalAvroString(numSchemas);
        ArrayList<Integer> ids = new ArrayList<Integer>();
        HashSet<RestApp> followerApps = new HashSet<RestApp>();
        RestApp aFollower = null;
        for (int i = 0; i < numFollowers; ++i) {
            RestApp follower = new RestApp(LeaderElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", CompatibilityLevel.NONE.name, false, null);
            followerApps.add(follower);
            follower.start();
            aFollower = follower;
        }
        Assert.assertNotNull(aFollower);
        boolean successfullyRegistered = false;
        try {
            aFollower.restClient.registerSchema(schemas.get(0), subject);
            successfullyRegistered = true;
        }
        catch (RestClientException follower) {
            // empty catch block
        }
        Assert.assertFalse((String)"Should not be possible to register with no leaders present.", (boolean)successfullyRegistered);
        HashSet<RestApp> leaderApps = new HashSet<RestApp>();
        RestApp aLeader = null;
        for (int i = 0; i < numLeaders; ++i) {
            leader = new RestApp(LeaderElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", CompatibilityLevel.NONE.name, true, null);
            leaderApps.add((RestApp)leader);
            ((RestApp)leader).start();
            aLeader = leader;
        }
        Assert.assertNotNull(aLeader);
        try {
            for (String schema : schemas) {
                ids.add(aLeader.restClient.registerSchema(schema, subject));
            }
        }
        catch (RestClientException e) {
            Assert.fail((String)"It should be possible to register schemas when a leader cluster is present.");
        }
        String anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        try {
            ids.add(aFollower.restClient.registerSchema(anotherSchema, subject));
        }
        catch (RestClientException e) {
            Assert.fail((String)"Should be possible register a schema through follower cluster.");
        }
        try {
            Iterator e = ids.iterator();
            while (e.hasNext()) {
                id = (Integer)e.next();
                this.waitUntilIdExists(aFollower.restClient, id, String.format("Should be possible to fetch id %d from this follower.", id));
                this.waitUntilIdExists(aLeader.restClient, id, String.format("Should be possible to fetch id %d from this leader.", id));
                SchemaString followerResponse = aFollower.restClient.getId(id);
                SchemaString leaderResponse = aLeader.restClient.getId(id);
                Assert.assertEquals((String)"Leader and follower responded with different schemas when queried with the same id.", (Object)followerResponse.getSchemaString(), (Object)leaderResponse.getSchemaString());
            }
        }
        catch (RestClientException e) {
            Assert.fail((String)"Expected ids were not found in the schema registry.");
        }
        while (leaderApps.size() > 0) {
            leader = TestUtils.findLeader(leaderApps);
            leaderApps.remove(leader);
            ((RestApp)leader).stop();
            TestUtils.waitUntilLeaderElectionCompletes(leaderApps);
        }
        anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        successfullyRegistered = false;
        try {
            aFollower.restClient.registerSchema(anotherSchema, subject);
            successfullyRegistered = true;
        }
        catch (RestClientException leader2) {
            // empty catch block
        }
        Assert.assertFalse((String)"Should not be possible to register with no leaders present.", (boolean)successfullyRegistered);
        try {
            leader = ids.iterator();
            while (leader.hasNext()) {
                id = (Integer)leader.next();
                SchemaString schemaString = aFollower.restClient.getId(id);
            }
            List versions = aFollower.restClient.getAllVersions(subject);
            Assert.assertEquals((String)"Number of ids should match number of versions.", (long)ids.size(), (long)versions.size());
        }
        catch (RestClientException e) {
            Assert.fail((String)"Should be possible to fetch registered schemas even with no leaders present.");
        }
        for (RestApp follower : followerApps) {
            follower.stop();
        }
    }

    @Test
    public void testImportOnLeaderFollowerClusters() throws Exception {
        int id;
        Object leader;
        int numFollowers = 4;
        int numLeaders = 4;
        int numSchemas = 5;
        String subject = "testSubject";
        List<String> schemas = TestUtils.getRandomCanonicalAvroString(numSchemas);
        ArrayList<Integer> ids = new ArrayList<Integer>();
        Properties props = new Properties();
        props.setProperty("mode.mutability", "true");
        int newId = 100000;
        int newVersion = 100;
        HashSet<RestApp> followerApps = new HashSet<RestApp>();
        RestApp aFollower = null;
        for (int i = 0; i < numFollowers; ++i) {
            RestApp follower = new RestApp(LeaderElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", CompatibilityLevel.NONE.name, false, props);
            followerApps.add(follower);
            follower.start();
            aFollower = follower;
        }
        Assert.assertNotNull(aFollower);
        boolean successfullyRegistered = false;
        try {
            aFollower.restClient.registerSchema(schemas.get(0), subject, newVersion++, newId++);
            successfullyRegistered = true;
        }
        catch (RestClientException follower) {
            // empty catch block
        }
        Assert.assertFalse((String)"Should not be possible to register with no leaders present.", (boolean)successfullyRegistered);
        HashSet<RestApp> leaderApps = new HashSet<RestApp>();
        RestApp aLeader = null;
        for (int i = 0; i < numLeaders; ++i) {
            leader = new RestApp(LeaderElectorTest.choosePort(), this.zkConnect(), this.bootstrapServers(), "", CompatibilityLevel.NONE.name, true, props);
            leaderApps.add((RestApp)leader);
            ((RestApp)leader).start();
            aLeader = leader;
        }
        Assert.assertNotNull(aLeader);
        try {
            aLeader.restClient.setMode(Mode.IMPORT.toString());
        }
        catch (RestClientException e) {
            Assert.fail((String)"It should be possible to set mode when a leader cluster is present.");
        }
        try {
            for (String schema : schemas) {
                ids.add(aLeader.restClient.registerSchema(schema, subject, newVersion++, newId++));
            }
        }
        catch (RestClientException e) {
            Assert.fail((String)("It should be possible to register schemas when a leader cluster is present. Error: " + e.getMessage()));
        }
        String anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        try {
            ids.add(aFollower.restClient.registerSchema(anotherSchema, subject, newVersion++, newId++));
        }
        catch (RestClientException e) {
            Assert.fail((String)"Should be possible register a schema through follower cluster.");
        }
        try {
            Iterator e = ids.iterator();
            while (e.hasNext()) {
                id = (Integer)e.next();
                this.waitUntilIdExists(aFollower.restClient, id, String.format("Should be possible to fetch id %d from this follower.", id));
                this.waitUntilIdExists(aLeader.restClient, id, String.format("Should be possible to fetch id %d from this leader.", id));
                SchemaString followerResponse = aFollower.restClient.getId(id);
                SchemaString leaderResponse = aLeader.restClient.getId(id);
                Assert.assertEquals((String)"Leader and follower responded with different schemas when queried with the same id.", (Object)followerResponse.getSchemaString(), (Object)leaderResponse.getSchemaString());
            }
        }
        catch (RestClientException e) {
            Assert.fail((String)"Expected ids were not found in the schema registry.");
        }
        while (leaderApps.size() > 0) {
            leader = TestUtils.findLeader(leaderApps);
            leaderApps.remove(leader);
            ((RestApp)leader).stop();
            TestUtils.waitUntilLeaderElectionCompletes(leaderApps);
        }
        anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        successfullyRegistered = false;
        try {
            aFollower.restClient.registerSchema(anotherSchema, subject, newVersion++, newId++);
            successfullyRegistered = true;
        }
        catch (RestClientException leader2) {
            // empty catch block
        }
        Assert.assertFalse((String)"Should not be possible to register with no leaders present.", (boolean)successfullyRegistered);
        try {
            leader = ids.iterator();
            while (leader.hasNext()) {
                id = (Integer)leader.next();
                SchemaString schemaString = aFollower.restClient.getId(id);
            }
            List versions = aFollower.restClient.getAllVersions(subject);
            Assert.assertEquals((String)"Number of ids should match number of versions.", (long)ids.size(), (long)versions.size());
        }
        catch (RestClientException e) {
            Assert.fail((String)"Should be possible to fetch registered schemas even with no leaders present.");
        }
        for (RestApp follower : followerApps) {
            follower.stop();
        }
    }

    private static void checkNoneIsLeader(Collection<RestApp> cluster) {
        Assert.assertNull((String)"Expected none of the nodes in this cluster to report itself as leader.", (Object)TestUtils.findLeader(cluster));
    }

    private static void checkLeaderIdentity(Collection<RestApp> cluster, SchemaRegistryIdentity expectedLeaderIdentity) {
        for (RestApp restApp : cluster) {
            for (int i = 0; i < 3; ++i) {
                SchemaRegistryIdentity leaderIdentity = restApp.leaderIdentity();
                if (leaderIdentity == null) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException interruptedException) {}
                    continue;
                }
                Assert.assertEquals((String)("Each leader identity should be " + expectedLeaderIdentity), (Object)expectedLeaderIdentity, (Object)leaderIdentity);
            }
        }
    }

    private void waitUntilIdExists(final RestService restService, final int expectedId, String errorMsg) {
        Callable<Boolean> canGetSchemaById = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                try {
                    restService.getId(expectedId);
                    return true;
                }
                catch (RestClientException e) {
                    return false;
                }
            }
        };
        TestUtils.waitUntilTrue(canGetSchemaById, 15000L, errorMsg);
    }

    private void waitUntilCompatibilityLevelSet(final RestService restService, final String subject, final String expectedCompatibilityLevel, String errorMsg) {
        Callable<Boolean> canGetSchemaById = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                try {
                    String actualCompatibilityLevel = restService.getConfig(subject).getCompatibilityLevel();
                    return expectedCompatibilityLevel.compareTo(actualCompatibilityLevel) == 0;
                }
                catch (RestClientException e) {
                    return false;
                }
            }
        };
        TestUtils.waitUntilTrue(canGetSchemaById, 15000L, errorMsg);
    }

    private void verifyIdAndSchema(RestService restService, int expectedId, String expectedSchemaString, String errMsg) {
        this.waitUntilIdExists(restService, expectedId, errMsg);
        String schemaString = null;
        try {
            schemaString = restService.getId(expectedId).getSchemaString();
        }
        catch (IOException e) {
            Assert.fail((String)errMsg);
        }
        catch (RestClientException e) {
            Assert.fail((String)errMsg);
        }
        Assert.assertEquals((String)errMsg, (Object)expectedSchemaString, (Object)schemaString);
    }
}

