/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.masterelector.zookeeper;

import io.confluent.common.utils.zookeeper.ZkUtils;
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.RestApp;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.storage.serialization.ZkStringSerializer;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.junit.Assert;
import org.junit.Test;

public class ZookeeperMasterElectorTest
extends ClusterTestHarness {
    private static final int ID_BATCH_SIZE = 20;
    private static final String ZK_ID_COUNTER_PATH = "/schema_registry/schema_id_counter";

    @Test
    public void testIncreasingIdZkResetLow() throws Exception {
        RestApp restApp1 = new RestApp(ZookeeperMasterElectorTest.choosePort(), this.zkConnect, "");
        restApp1.start();
        List<String> schemas = TestUtils.getRandomCanonicalAvroString(20);
        String subject = "testSubject";
        HashSet<Integer> ids = new HashSet<Integer>();
        int maxId = -1;
        for (int i = 0; i < 10; ++i) {
            int newId = restApp1.restClient.registerSchema(schemas.get(i), subject);
            ids.add(newId);
            Assert.assertTrue((newId > maxId ? 1 : 0) != 0);
            maxId = newId;
        }
        ZkClient zkClient = new ZkClient(this.zkConnect, 10000, 10000, (ZkSerializer)new ZkStringSerializer());
        int zkIdCounter = ZookeeperMasterElectorTest.getZkIdCounter(zkClient);
        Assert.assertEquals((long)20L, (long)zkIdCounter);
        ZkUtils.updatePersistentPath((ZkClient)zkClient, (String)ZK_ID_COUNTER_PATH, (String)"0");
        String anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        int newId = restApp1.restClient.registerSchema(anotherSchema, subject);
        Assert.assertTrue((String)"Next assigned id should be greater than all previous.", (newId > maxId ? 1 : 0) != 0);
        maxId = newId;
        final RestApp restApp2 = new RestApp(ZookeeperMasterElectorTest.choosePort(), this.zkConnect, "");
        restApp2.start();
        restApp1.stop();
        Callable<Boolean> electionComplete = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return restApp2.isMaster();
            }
        };
        TestUtils.waitUntilTrue(electionComplete, 15000L, "Schema registry instance 2 should become the master");
        Assert.assertEquals((String)"Zk counter is not the expected value.", (long)40L, (long)ZookeeperMasterElectorTest.getZkIdCounter(zkClient));
        ZkUtils.updatePersistentPath((ZkClient)zkClient, (String)ZK_ID_COUNTER_PATH, (String)"0");
        schemas = TestUtils.getRandomCanonicalAvroString(20);
        for (int i = 0; i < 20; ++i) {
            newId = restApp2.restClient.registerSchema(schemas.get(i), subject);
            ids.add(newId);
            Assert.assertTrue((String)("new id " + newId + " should be greater than previous max " + maxId), (newId > maxId ? 1 : 0) != 0);
            maxId = newId;
        }
        Assert.assertEquals((String)"Zk counter is not the expected value.", (long)60L, (long)ZookeeperMasterElectorTest.getZkIdCounter(zkClient));
    }

    @Test
    public void testIdBehaviorWithZkWithoutKafka() throws Exception {
        ZkClient zkClient = new ZkClient(this.zkConnect, 10000, 10000, (ZkSerializer)new ZkStringSerializer());
        int weirdInitialCounterValue = 19;
        ZkUtils.createPersistentPath((ZkClient)zkClient, (String)ZK_ID_COUNTER_PATH, (String)("" + weirdInitialCounterValue));
        RestApp restApp = new RestApp(ZookeeperMasterElectorTest.choosePort(), this.zkConnect, "");
        restApp.start();
        Assert.assertEquals((String)"", (long)40L, (long)ZookeeperMasterElectorTest.getZkIdCounter(zkClient));
    }

    @Test
    public void testIdBehaviorWithoutZkWithKafka() throws Exception {
        int numSchemas = 2;
        List<String> schemas = TestUtils.getRandomCanonicalAvroString(numSchemas);
        String subject = "testSubject";
        HashSet<Integer> ids = new HashSet<Integer>();
        RestApp restApp = new RestApp(ZookeeperMasterElectorTest.choosePort(), this.zkConnect, "");
        restApp.start();
        for (String schema : schemas) {
            int id = restApp.restClient.registerSchema(schema, subject);
            ids.add(id);
            this.waitUntilIdExists(restApp.restClient, id, "Expected id to be available.");
        }
        restApp.stop();
        int zkIdCounter = ZookeeperMasterElectorTest.getZkIdCounter(this.zkClient);
        Assert.assertEquals((String)"Incorrect ZK id counter.", (long)20L, (long)zkIdCounter);
        this.zkClient.delete(ZK_ID_COUNTER_PATH);
        restApp = new RestApp(ZookeeperMasterElectorTest.choosePort(), this.zkConnect, "");
        restApp.start();
        zkIdCounter = ZookeeperMasterElectorTest.getZkIdCounter(this.zkClient);
        Assert.assertEquals((String)"ZK id counter was incorrectly initialized.", (long)40L, (long)zkIdCounter);
        restApp.stop();
    }

    @Test
    public void testZkCounterOnStartup() throws Exception {
        RestApp restApp = new RestApp(ZookeeperMasterElectorTest.choosePort(), this.zkConnect, "");
        restApp.start();
        int zkIdCounter = ZookeeperMasterElectorTest.getZkIdCounter(this.zkClient);
        Assert.assertEquals((String)"Initial value of ZooKeeper id counter is incorrect.", (long)20L, (long)zkIdCounter);
        restApp.stop();
    }

    private static int getZkIdCounter(ZkClient zkClient) {
        return Integer.valueOf(ZkUtils.readData((ZkClient)zkClient, (String)ZK_ID_COUNTER_PATH).getData());
    }

    private void waitUntilIdExists(final RestService restService, final int expectedId, String errorMsg) {
        Callable<Boolean> canGetSchemaById = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                try {
                    restService.getId(expectedId);
                    return true;
                }
                catch (RestClientException e) {
                    return false;
                }
            }
        };
        TestUtils.waitUntilTrue(canGetSchemaById, 15000L, errorMsg);
    }
}

