/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.org.apache.zookeeper.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hive.org.apache.zookeeper.CreateMode;
import org.apache.hive.org.apache.zookeeper.KeeperException;
import org.apache.hive.org.apache.zookeeper.WatchedEvent;
import org.apache.hive.org.apache.zookeeper.Watcher;
import org.apache.hive.org.apache.zookeeper.ZKTestCase;
import org.apache.hive.org.apache.zookeeper.ZooDefs;
import org.apache.hive.org.apache.zookeeper.ZooKeeper;
import org.apache.hive.org.apache.zookeeper.client.FourLetterWordMain;
import org.apache.hive.org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.hive.org.apache.zookeeper.server.util.PortForwarder;
import org.apache.hive.org.apache.zookeeper.test.ClientBase;
import org.apache.hive.org.slf4j.Logger;
import org.apache.hive.org.slf4j.LoggerFactory;
import org.junit.Assert;
import org.junit.Test;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class TruncateCorruptionTest
extends ZKTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TruncateCorruptionTest.class);

    public static boolean await(Check check, long timeoutMillis) throws InterruptedException {
        long end = System.currentTimeMillis() + timeoutMillis;
        while (end > System.currentTimeMillis()) {
            if (check.doCheck()) {
                LOG.debug("await succeeded after " + (System.currentTimeMillis() - end + timeoutMillis));
                return true;
            }
            Thread.sleep(50L);
        }
        LOG.debug("await failed in {}", (Object)timeoutMillis);
        return false;
    }

    @Test
    public void testTransactionLogCorruption() throws Exception {
        ZookeeperServerWrapper wrapper1 = new ZookeeperServerWrapper(1, 7000);
        ZookeeperServerWrapper wrapper2 = new ZookeeperServerWrapper(2, 8000);
        ZookeeperServerWrapper wrapper3 = new ZookeeperServerWrapper(3, 8000);
        wrapper2.start();
        wrapper3.start();
        try {
            wrapper2.await(ClientBase.CONNECTION_TIMEOUT);
            wrapper3.await(ClientBase.CONNECTION_TIMEOUT);
        }
        catch (Exception e) {
            ClientBase.logAllStackTraces();
            throw e;
        }
        List<PortForwarder> pfs = this.startForwarding();
        Thread.sleep(1000L);
        wrapper1.start();
        wrapper1.await(ClientBase.CONNECTION_TIMEOUT);
        final ZooKeeper zk1 = new ZooKeeper("localhost:8201", ClientBase.CONNECTION_TIMEOUT, (Watcher)new ZkWatcher("zk1"));
        this.waitForConnection(zk1);
        zk1.create("/test", "testdata".getBytes(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Thread.sleep(1000L);
        wrapper2.stop();
        this.waitForConnection(zk1);
        zk1.create("/test2", "testdata".getBytes(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        wrapper3.stop();
        this.stopForwarding(pfs);
        LOG.info("interrupted network connection ... waiting for zk1 and zk2 to realize");
        Assert.assertTrue((boolean)TruncateCorruptionTest.await(new Check(){

            public boolean doCheck() {
                if (zk1.getState() == ZooKeeper.States.CONNECTING) {
                    try {
                        List children = zk1.getChildren("/", false);
                        return children.size() != 0;
                    }
                    catch (KeeperException.ConnectionLossException e) {
                        return true;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                return false;
            }
        }, TimeUnit.MINUTES.toMillis(2L)));
        wrapper3.clean();
        wrapper2.start();
        wrapper3.start();
        LOG.info("Waiting for zk2 and zk3 to form a quorum");
        wrapper2.await(ClientBase.CONNECTION_TIMEOUT);
        wrapper3.await(ClientBase.CONNECTION_TIMEOUT);
        ZooKeeper zk2 = new ZooKeeper("localhost:8202", ClientBase.CONNECTION_TIMEOUT, (Watcher)new ZkWatcher("zk2"));
        this.waitForConnection(zk2);
        LOG.info("re-establishing network connection and waiting for zk1 to reconnect");
        pfs = this.startForwarding();
        this.waitForConnection(zk1);
        LOG.info("Creating node test3");
        zk1.create("/test3", "testdata".getBytes(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Thread.sleep(250L);
        LOG.info("List of children at zk2 before zk1 became master");
        List children2 = zk2.getChildren("/", false);
        LOG.info(children2.toString());
        LOG.info("List of children at zk1 before zk1 became master");
        List children1 = zk1.getChildren("/", false);
        LOG.info(children1.toString());
        LOG.info("restarting zk2 and zk3 while cleaning zk3 to enforce zk1 to become master");
        wrapper2.stop();
        wrapper3.stop();
        wrapper3.clean();
        wrapper3.start();
        wrapper3.await(TimeUnit.MINUTES.toMillis(2L));
        ZooKeeper zk3 = new ZooKeeper("localhost:8203", ClientBase.CONNECTION_TIMEOUT, (Watcher)new ZkWatcher("zk3"));
        this.waitForConnection(zk3);
        LOG.info("Zk1 and zk3 have a quorum, now starting zk2");
        wrapper2.start();
        this.waitForConnection(zk2);
        LOG.info("List of children at zk2");
        children2 = zk2.getChildren("/", false);
        LOG.info(children2.toString());
        this.waitForConnection(zk1);
        LOG.info("List of children at zk1");
        children1 = zk1.getChildren("/", false);
        Assert.assertTrue((String)"test3 node is missing on zk1", (boolean)children1.contains("test3"));
        Assert.assertTrue((String)"test3 node is missing on zk2", (boolean)children2.contains("test3"));
        Assert.assertEquals((Object)children1, (Object)children2);
        this.stopForwarding(pfs);
    }

    private void stopForwarding(List<PortForwarder> pfs) throws Exception {
        for (PortForwarder pf : pfs) {
            pf.shutdown();
        }
    }

    private List<PortForwarder> startForwarding() throws IOException {
        ArrayList<PortForwarder> res = new ArrayList<PortForwarder>();
        res.add(new PortForwarder(8301, 7301));
        res.add(new PortForwarder(8401, 7401));
        res.add(new PortForwarder(7302, 8302));
        res.add(new PortForwarder(7402, 8402));
        res.add(new PortForwarder(7303, 8303));
        res.add(new PortForwarder(7403, 8403));
        return res;
    }

    private void waitForConnection(final ZooKeeper zk) throws InterruptedException {
        Assert.assertTrue((boolean)TruncateCorruptionTest.await(new Check(){

            public boolean doCheck() {
                if (zk.getState() == ZooKeeper.States.CONNECTED) {
                    try {
                        List children = zk.getChildren("/", false);
                        return children.size() != 0;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                return false;
            }
        }, TimeUnit.MINUTES.toMillis(2L)));
    }

    public static class ZookeeperServerWrapper {
        private static final Logger LOG = LoggerFactory.getLogger(ZookeeperServerWrapper.class);
        private final QuorumPeerTestBase.MainThread server;
        private final int clientPort;

        public ZookeeperServerWrapper(int serverId, int portBase) throws IOException {
            this.clientPort = 8200 + serverId;
            String quorumCfgSection = "server.1=127.0.0.1:" + (portBase + 301) + ":" + (portBase + 401) + "\nserver.2=127.0.0.1:" + (portBase + 302) + ":" + (portBase + 402) + "\nserver.3=127.0.0.1:" + (portBase + 303) + ":" + (portBase + 403);
            this.server = new QuorumPeerTestBase.MainThread(serverId, this.clientPort, quorumCfgSection);
        }

        public void start() throws Exception {
            this.server.start();
        }

        public void await(long timeout) throws Exception {
            long deadline = System.currentTimeMillis() + timeout;
            String result = "?";
            while (deadline > System.currentTimeMillis()) {
                try {
                    result = FourLetterWordMain.send4LetterWord((String)"127.0.0.1", (int)this.clientPort, (String)"stat");
                    if (result.startsWith("Zookeeper version:")) {
                        LOG.info("Started zookeeper server on port " + this.clientPort);
                        return;
                    }
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {}
            }
            LOG.info(result);
            throw new Exception("Failed to connect to zookeeper server");
        }

        public void stop() {
            try {
                this.server.shutdown();
            }
            catch (InterruptedException e) {
                LOG.info("Interrupted while shutting down");
            }
        }

        public void clean() throws IOException {
            this.server.clean();
        }
    }

    static class ZkWatcher
    implements Watcher {
        private final String clientId;

        ZkWatcher(String clientId) {
            this.clientId = clientId;
        }

        public void process(WatchedEvent event) {
            LOG.info("<<<EVENT>>> " + this.clientId + " - WatchedEvent: " + event);
        }
    }

    public static interface Check {
        public boolean doCheck();
    }
}

