/*
 * Decompiled with CFR 0.152.
 */
package hidden.bkjournal.org.apache.bookkeeper.client;

import hidden.bkjournal.org.apache.bookkeeper.client.AsyncCallback;
import hidden.bkjournal.org.apache.bookkeeper.client.BKException;
import hidden.bkjournal.org.apache.bookkeeper.client.BookKeeper;
import hidden.bkjournal.org.apache.bookkeeper.client.LedgerEntry;
import hidden.bkjournal.org.apache.bookkeeper.client.LedgerHandle;
import hidden.bkjournal.org.apache.bookkeeper.conf.ClientConfiguration;
import hidden.bkjournal.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import hidden.bkjournal.org.apache.zookeeper.AsyncCallback;
import hidden.bkjournal.org.apache.zookeeper.KeeperException;
import hidden.bkjournal.org.apache.zookeeper.WatchedEvent;
import hidden.bkjournal.org.apache.zookeeper.Watcher;
import hidden.bkjournal.org.apache.zookeeper.ZooKeeper;
import hidden.bkjournal.org.apache.zookeeper.data.Stat;
import hidden.bkjournal.org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookKeeperAdmin {
    private static Logger LOG = LoggerFactory.getLogger(BookKeeperAdmin.class);
    static final String COLON = ":";
    private ZooKeeper zk;
    static final String BOOKIES_PATH = "/ledgers/available";
    private BookKeeper bkc;
    private Random rand = new Random();
    private BookKeeper.DigestType DIGEST_TYPE;
    private byte[] PASSWD;

    public BookKeeperAdmin(String zkServers) throws IOException, InterruptedException, KeeperException {
        this(new ClientConfiguration().setZkServers(zkServers));
    }

    public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException {
        this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher(){

            @Override
            public void process(WatchedEvent event) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Process: " + (Object)((Object)event.getType()) + " " + event.getPath());
                }
            }
        });
        this.bkc = new BookKeeper(conf);
        this.DIGEST_TYPE = conf.getBookieRecoveryDigestType();
        this.PASSWD = conf.getBookieRecoveryPasswd();
    }

    public void close() throws InterruptedException, BKException {
        this.bkc.close();
        this.zk.close();
    }

    private BookKeeper.DigestType getLedgerDigestType(long ledgerId) {
        return this.DIGEST_TYPE;
    }

    private byte[] getLedgerPasswd(long ledgerId) {
        return this.PASSWD;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recoverBookieData(InetSocketAddress bookieSrc, InetSocketAddress bookieDest) throws InterruptedException, BKException {
        SyncObject sync = new SyncObject();
        this.asyncRecoverBookieData(bookieSrc, bookieDest, new AsyncCallback.RecoverCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void recoverComplete(int rc, Object ctx) {
                SyncObject syncObj;
                LOG.info("Recover bookie operation completed with rc: " + rc);
                SyncObject syncObject = syncObj = (SyncObject)ctx;
                synchronized (syncObject) {
                    syncObj.rc = rc;
                    syncObj.value = true;
                    syncObj.notify();
                }
            }
        }, sync);
        SyncObject syncObject = sync;
        synchronized (syncObject) {
            while (!sync.value) {
                sync.wait();
            }
        }
        if (sync.rc != 0) {
            throw BKException.create(sync.rc);
        }
    }

    public void asyncRecoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest, final AsyncCallback.RecoverCallback cb, final Object context) {
        this.zk.sync(BOOKIES_PATH, new AsyncCallback.VoidCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx) {
                if (rc != KeeperException.Code.OK.intValue()) {
                    LOG.error("ZK error syncing: ", (Throwable)KeeperException.create(KeeperException.Code.get(rc), path));
                    cb.recoverComplete(-9, context);
                    return;
                }
                BookKeeperAdmin.this.getAvailableBookies(bookieSrc, bookieDest, cb, context);
            }
        }, null);
    }

    private void getAvailableBookies(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest, final AsyncCallback.RecoverCallback cb, final Object context) {
        final LinkedList<InetSocketAddress> availableBookies = new LinkedList<InetSocketAddress>();
        if (bookieDest != null) {
            availableBookies.add(bookieDest);
            this.getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
        } else {
            this.zk.getChildren(BOOKIES_PATH, null, new AsyncCallback.ChildrenCallback(){

                @Override
                public void processResult(int rc, String path, Object ctx, List<String> children) {
                    if (rc != KeeperException.Code.OK.intValue()) {
                        LOG.error("ZK error getting bookie nodes: ", (Throwable)KeeperException.create(KeeperException.Code.get(rc), path));
                        cb.recoverComplete(-9, context);
                        return;
                    }
                    for (String bookieNode : children) {
                        String[] parts = bookieNode.split(BookKeeperAdmin.COLON);
                        if (parts.length < 2) {
                            LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode);
                            cb.recoverComplete(-9, context);
                            return;
                        }
                        availableBookies.add(new InetSocketAddress(parts[0], Integer.parseInt(parts[1])));
                    }
                    BookKeeperAdmin.this.getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
                }
            }, null);
        }
    }

    private void getActiveLedgers(final InetSocketAddress bookieSrc, InetSocketAddress bookieDest, AsyncCallback.RecoverCallback cb, Object context, final List<InetSocketAddress> availableBookies) {
        BookkeeperInternalCallbacks.Processor<Long> ledgerProcessor = new BookkeeperInternalCallbacks.Processor<Long>(){

            @Override
            public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
                BookKeeperAdmin.this.recoverLedger(bookieSrc, ledgerId, iterCallback, availableBookies);
            }
        };
        class RecoverCallbackWrapper
        implements AsyncCallback.VoidCallback {
            final AsyncCallback.RecoverCallback cb;

            RecoverCallbackWrapper(AsyncCallback.RecoverCallback cb) {
                this.cb = cb;
            }

            @Override
            public void processResult(int rc, String path, Object ctx) {
                this.cb.recoverComplete(rc, ctx);
            }
        }
        this.bkc.getLedgerManager().asyncProcessLedgers(ledgerProcessor, new RecoverCallbackWrapper(cb), context, 0, -10);
    }

    private InetSocketAddress getNewBookie(List<InetSocketAddress> bookiesAlreadyInEnsemble, List<InetSocketAddress> availableBookies) throws BKException.BKNotEnoughBookiesException {
        ArrayList<InetSocketAddress> candidates = new ArrayList<InetSocketAddress>();
        candidates.addAll(availableBookies);
        candidates.removeAll(bookiesAlreadyInEnsemble);
        if (candidates.size() == 0) {
            throw new BKException.BKNotEnoughBookiesException();
        }
        return (InetSocketAddress)candidates.get(this.rand.nextInt(candidates.size()));
    }

    private void recoverLedger(final InetSocketAddress bookieSrc, final long lId, final AsyncCallback.VoidCallback ledgerIterCb, final List<InetSocketAddress> availableBookies) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Recovering ledger : " + lId);
        }
        BookKeeper.DigestType digestType = this.getLedgerDigestType(lId);
        byte[] passwd = this.getLedgerPasswd(lId);
        this.bkc.asyncOpenLedgerNoRecovery(lId, digestType, passwd, new AsyncCallback.OpenCallback(){

            @Override
            public void openComplete(int rc, LedgerHandle lh, Object ctx) {
                if (rc != KeeperException.Code.OK.intValue()) {
                    LOG.error("BK error opening ledger: " + lId, (Throwable)BKException.create(rc));
                    ledgerIterCb.processResult(rc, null, null);
                    return;
                }
                LinkedList<Long> ledgerFragmentsToRecover = new LinkedList<Long>();
                HashMap<Long, Long> ledgerFragmentsRange = new HashMap<Long, Long>();
                Long curEntryId = null;
                for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : lh.getLedgerMetadata().getEnsembles().entrySet()) {
                    if (curEntryId != null) {
                        ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1L);
                    }
                    curEntryId = entry.getKey();
                    if (!entry.getValue().contains(bookieSrc)) continue;
                    ledgerFragmentsToRecover.add(entry.getKey());
                }
                if (ledgerFragmentsToRecover.size() == 0) {
                    ledgerIterCb.processResult(0, null, null);
                    return;
                }
                BookkeeperInternalCallbacks.MultiCallback ledgerFragmentsMcb = new BookkeeperInternalCallbacks.MultiCallback(ledgerFragmentsToRecover.size(), ledgerIterCb, null, 0, -10);
                for (Long startEntryId : ledgerFragmentsToRecover) {
                    Long endEntryId = (Long)ledgerFragmentsRange.get(startEntryId);
                    InetSocketAddress newBookie = null;
                    try {
                        newBookie = BookKeeperAdmin.this.getNewBookie((List)lh.getLedgerMetadata().getEnsembles().get(startEntryId), availableBookies);
                    }
                    catch (BKException.BKNotEnoughBookiesException bke) {
                        ledgerFragmentsMcb.processResult(-6, null, null);
                        continue;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Replicating fragment from [" + startEntryId + "," + endEntryId + "] of ledger " + lh.getId() + " to " + newBookie);
                    }
                    try {
                        SingleFragmentCallback cb = new SingleFragmentCallback(ledgerFragmentsMcb, lh, startEntryId, bookieSrc, newBookie);
                        BookKeeperAdmin.this.recoverLedgerFragment(bookieSrc, lh, startEntryId, endEntryId, cb, newBookie);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        }, null);
    }

    private void recoverLedgerFragment(InetSocketAddress bookieSrc, LedgerHandle lh, Long startEntryId, Long endEntryId, SingleFragmentCallback cb, InetSocketAddress newBookie) throws InterruptedException {
        if (endEntryId == null) {
            LOG.warn("Dead bookie (" + bookieSrc + ") is still part of the current active ensemble for ledgerId: " + lh.getId());
            cb.processResult(0, null, null);
            return;
        }
        ArrayList curEnsemble = (ArrayList)lh.getLedgerMetadata().getEnsembles().get(startEntryId);
        int bookieIndex = 0;
        for (int i = 0; i < curEnsemble.size(); ++i) {
            if (!((InetSocketAddress)curEnsemble.get(i)).equals(bookieSrc)) continue;
            bookieIndex = i;
            break;
        }
        LinkedList<Long> entriesToReplicate = new LinkedList<Long>();
        for (long i = startEntryId.longValue(); i <= endEntryId; ++i) {
            if (lh.getDistributionSchedule().getReplicaIndex(i, bookieIndex) < 0) continue;
            entriesToReplicate.add(i);
        }
        BookkeeperInternalCallbacks.MultiCallback ledgerFragmentEntryMcb = new BookkeeperInternalCallbacks.MultiCallback(entriesToReplicate.size(), cb, null, 0, -10);
        for (Long entryId : entriesToReplicate) {
            this.recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookie);
        }
    }

    private void recoverLedgerFragmentEntry(final Long entryId, LedgerHandle lh, final BookkeeperInternalCallbacks.MultiCallback ledgerFragmentEntryMcb, final InetSocketAddress newBookie) throws InterruptedException {
        lh.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback(){

            @Override
            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                if (rc != KeeperException.Code.OK.intValue()) {
                    LOG.error("BK error reading ledger entry: " + entryId, (Throwable)BKException.create(rc));
                    ledgerFragmentEntryMcb.processResult(rc, null, null);
                    return;
                }
                LedgerEntry entry = seq.nextElement();
                byte[] data = entry.getEntry();
                ChannelBuffer toSend = lh.getDigestManager().computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), entry.getLength(), data, 0, data.length);
                BookKeeperAdmin.this.bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, toSend, new BookkeeperInternalCallbacks.WriteCallback(){

                    @Override
                    public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
                        if (rc != KeeperException.Code.OK.intValue()) {
                            LOG.error("BK error writing entry for ledgerId: " + ledgerId + ", entryId: " + entryId + ", bookie: " + addr, (Throwable)BKException.create(rc));
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("Success writing ledger id " + ledgerId + ", entry id " + entryId + " to a new bookie " + addr + "!");
                        }
                        ledgerFragmentEntryMcb.processResult(rc, null, null);
                    }
                }, null, 2);
            }
        }, null);
    }

    class SingleFragmentCallback
    implements AsyncCallback.VoidCallback {
        final BookkeeperInternalCallbacks.MultiCallback ledgerFragmentsMcb;
        final LedgerHandle lh;
        final long fragmentStartId;
        final InetSocketAddress oldBookie;
        final InetSocketAddress newBookie;

        SingleFragmentCallback(BookkeeperInternalCallbacks.MultiCallback ledgerFragmentsMcb, LedgerHandle lh, long fragmentStartId, InetSocketAddress oldBookie, InetSocketAddress newBookie) {
            this.ledgerFragmentsMcb = ledgerFragmentsMcb;
            this.lh = lh;
            this.fragmentStartId = fragmentStartId;
            this.newBookie = newBookie;
            this.oldBookie = oldBookie;
        }

        @Override
        public void processResult(int rc, String path, Object ctx) {
            if (rc != KeeperException.Code.OK.intValue()) {
                LOG.error("BK error replicating ledger fragments for ledger: " + this.lh.getId(), (Throwable)BKException.create(rc));
                this.ledgerFragmentsMcb.processResult(rc, null, null);
                return;
            }
            ArrayList ensemble = (ArrayList)this.lh.getLedgerMetadata().getEnsembles().get(this.fragmentStartId);
            int deadBookieIndex = ensemble.indexOf(this.oldBookie);
            ensemble.remove(deadBookieIndex);
            ensemble.add(deadBookieIndex, this.newBookie);
            this.lh.writeLedgerConfig(new WriteCb(), null);
        }

        private class WriteCb
        implements AsyncCallback.StatCallback {
            private WriteCb() {
            }

            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                if (rc == KeeperException.Code.BADVERSION.intValue()) {
                    LOG.warn("Two fragments attempted update at once; ledger id: " + SingleFragmentCallback.this.lh.getId() + " startid: " + SingleFragmentCallback.this.fragmentStartId);
                    SingleFragmentCallback.this.lh.writeLedgerConfig(new WriteCb(), null);
                    return;
                }
                if (rc != KeeperException.Code.OK.intValue()) {
                    LOG.error("ZK error updating ledger config metadata for ledgerId: " + SingleFragmentCallback.this.lh.getId(), (Throwable)KeeperException.create(KeeperException.Code.get(rc), path));
                } else {
                    SingleFragmentCallback.this.lh.getLedgerMetadata().updateZnodeStatus(stat);
                    LOG.info("Updated ZK for ledgerId: (" + SingleFragmentCallback.this.lh.getId() + " : " + SingleFragmentCallback.this.fragmentStartId + ") to point ledger fragments from old dead bookie: (" + SingleFragmentCallback.this.oldBookie + ") to new bookie: (" + SingleFragmentCallback.this.newBookie + ")");
                }
                SingleFragmentCallback.this.ledgerFragmentsMcb.processResult(rc, null, null);
            }
        }
    }

    class SyncObject {
        boolean value = false;
        int rc = 0;
    }
}

