package org.apache.hive.org.apache.zookeeper.test;

import java.util.concurrent.CountDownLatch;
import org.apache.hive.org.apache.zookeeper.AsyncCallback;
import org.apache.hive.org.apache.zookeeper.CreateMode;
import org.apache.hive.org.apache.zookeeper.KeeperException;
import org.apache.hive.org.apache.zookeeper.PortAssignment;
import org.apache.hive.org.apache.zookeeper.WatchedEvent;
import org.apache.hive.org.apache.zookeeper.Watcher;
import org.apache.hive.org.apache.zookeeper.ZooDefs;
import org.apache.hive.org.apache.zookeeper.ZooKeeper;
import org.apache.hive.org.apache.zookeeper.data.Stat;
import org.apache.hive.org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.hive.org.slf4j.Logger;
import org.apache.hive.org.slf4j.LoggerFactory;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/hive/org/apache/zookeeper/test/ObserverTest.class */
public class ObserverTest extends QuorumPeerTestBase implements Watcher {
    protected static final Logger LOG = LoggerFactory.getLogger((Class<?>) ObserverTest.class);
    CountDownLatch latch;
    ZooKeeper zk;
    WatchedEvent lastEvent = null;

    @Test
    public void testObserver() throws Exception {
        ClientBase.setupTestEnv();
        this.latch = new CountDownLatch(2);
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        int unique3 = PortAssignment.unique();
        int unique4 = PortAssignment.unique();
        int unique5 = PortAssignment.unique();
        int unique6 = PortAssignment.unique();
        int unique7 = PortAssignment.unique();
        int unique8 = PortAssignment.unique();
        int unique9 = PortAssignment.unique();
        String str = "electionAlg=3\nserver.1=127.0.0.1:" + unique + ":" + unique4 + "\nserver.2=127.0.0.1:" + unique2 + ":" + unique5 + "\nserver.3=127.0.0.1:" + unique3 + ":" + unique6 + ":observer";
        String str2 = str + "\npeerType=observer";
        QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique7, str);
        QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique8, str);
        QuorumPeerTestBase.MainThread mainThread3 = new QuorumPeerTestBase.MainThread(3, unique9, str2);
        mainThread.start();
        mainThread2.start();
        mainThread3.start();
        Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique7, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique8, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique9, ClientBase.CONNECTION_TIMEOUT));
        this.zk = new ZooKeeper("127.0.0.1:" + unique9, ClientBase.CONNECTION_TIMEOUT, this);
        this.zk.create("/obstest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals(new String(this.zk.getData("/obstest", (Watcher) null, (Stat) null)), "test");
        this.zk.sync("/", (AsyncCallback.VoidCallback) null, (Object) null);
        this.zk.setData("/obstest", "test2".getBytes(), -1);
        this.zk.getChildren("/", false);
        Assert.assertEquals(this.zk.getState(), ZooKeeper.States.CONNECTED);
        LOG.info("Shutting down server 2");
        mainThread2.shutdown();
        Assert.assertTrue("Waiting for server 2 to shut down", ClientBase.waitForServerDown("127.0.0.1:" + unique8, ClientBase.CONNECTION_TIMEOUT));
        LOG.info("Server 2 down");
        this.latch.await();
        Assert.assertNotSame("Client is still connected to non-quorate cluster", Watcher.Event.KeeperState.SyncConnected, this.lastEvent.getState());
        LOG.info("Latch returned");
        try {
            Assert.assertFalse("Shouldn't get a response when cluster not quorate!", new String(this.zk.getData("/obstest", (Watcher) null, (Stat) null)).equals("test"));
        } catch (KeeperException.ConnectionLossException e) {
            LOG.info("Connection loss exception caught - ensemble not quorate (this is expected)");
        }
        this.latch = new CountDownLatch(1);
        LOG.info("Restarting server 2");
        QuorumPeerTestBase.MainThread mainThread4 = new QuorumPeerTestBase.MainThread(2, unique8, str);
        mainThread4.start();
        LOG.info("Waiting for server 2 to come up");
        Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique8, ClientBase.CONNECTION_TIMEOUT));
        LOG.info("Server 2 started, waiting for latch");
        this.latch.await();
        Assert.assertTrue("Client didn't reconnect to quorate ensemble (state was" + this.lastEvent.getState() + ")", Watcher.Event.KeeperState.SyncConnected == this.lastEvent.getState() || Watcher.Event.KeeperState.Expired == this.lastEvent.getState());
        LOG.info("Shutting down all servers");
        mainThread.shutdown();
        mainThread4.shutdown();
        mainThread3.shutdown();
        LOG.info("Closing zk client");
        this.zk.close();
        Assert.assertTrue("Waiting for server 1 to shut down", ClientBase.waitForServerDown("127.0.0.1:" + unique7, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("Waiting for server 2 to shut down", ClientBase.waitForServerDown("127.0.0.1:" + unique8, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("Waiting for server 3 to shut down", ClientBase.waitForServerDown("127.0.0.1:" + unique9, ClientBase.CONNECTION_TIMEOUT));
    }

    @Override // org.apache.hive.org.apache.zookeeper.server.quorum.QuorumPeerTestBase
    public void process(WatchedEvent watchedEvent) {
        this.lastEvent = watchedEvent;
        this.latch.countDown();
        LOG.info("Latch got event :: " + watchedEvent);
    }

    @Test
    public void testObserverOnly() throws Exception {
        ClientBase.setupTestEnv();
        QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, PortAssignment.unique(), "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":observer\npeerType=observer\n");
        mainThread.start();
        mainThread.join(ClientBase.CONNECTION_TIMEOUT);
        Assert.assertFalse(mainThread.isAlive());
    }

    @Test
    public void testObserverWithStandlone() throws Exception {
        ClientBase.setupTestEnv();
        QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, PortAssignment.unique(), "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":observer\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\npeerType=observer\n");
        mainThread.start();
        mainThread.join(ClientBase.CONNECTION_TIMEOUT);
        Assert.assertFalse(mainThread.isAlive());
    }
}
