/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import bk-shade.com.google.common.annotations.VisibleForTesting;
import bk-shade.com.google.common.collect.Sets;
import bk-shade.com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.BookiesListener;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.replication.BookieLedgerIndexer;
import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Auditor
implements BookiesListener {
    private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
    private final ServerConfiguration conf;
    private BookKeeper bkc;
    private BookKeeperAdmin admin;
    private BookieLedgerIndexer bookieLedgerIndexer;
    private LedgerManager ledgerManager;
    private LedgerUnderreplicationManager ledgerUnderreplicationManager;
    private final ScheduledExecutorService executor;
    private List<String> knownBookies = new ArrayList<String>();
    private final String bookieIdentifier;
    private final Runnable BOOKIE_CHECK = new Runnable(){

        @Override
        public void run() {
            try {
                Auditor.this.auditBookies();
            }
            catch (BKException bke) {
                LOG.error("Couldn't get bookie list, exiting", (Throwable)bke);
                Auditor.this.submitShutdownTask();
            }
            catch (KeeperException ke) {
                LOG.error("Exception while watching available bookies", (Throwable)ke);
                Auditor.this.submitShutdownTask();
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                LOG.error("Interrupted while watching available bookies ", (Throwable)ie);
                Auditor.this.submitShutdownTask();
            }
            catch (ReplicationException.BKAuditException bke) {
                LOG.error("Exception while watching available bookies", (Throwable)bke);
                Auditor.this.submitShutdownTask();
            }
        }
    };

    public Auditor(final String bookieIdentifier, ServerConfiguration conf, ZooKeeper zkc) throws ReplicationException.UnavailableException {
        this.conf = conf;
        this.bookieIdentifier = bookieIdentifier;
        this.initialize(conf, zkc);
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "AuditorBookie-" + bookieIdentifier);
                t.setDaemon(true);
                return t;
            }
        });
    }

    private void initialize(ServerConfiguration conf, ZooKeeper zkc) throws ReplicationException.UnavailableException {
        try {
            LedgerManagerFactory ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
            this.ledgerManager = ledgerManagerFactory.newLedgerManager();
            this.bookieLedgerIndexer = new BookieLedgerIndexer(this.ledgerManager);
            this.ledgerUnderreplicationManager = ledgerManagerFactory.newLedgerUnderreplicationManager();
            this.bkc = new BookKeeper(new ClientConfiguration(conf), zkc);
            this.admin = new BookKeeperAdmin(this.bkc);
        }
        catch (ReplicationException.CompatibilityException ce) {
            throw new ReplicationException.UnavailableException("CompatibilityException while initializing Auditor", ce);
        }
        catch (IOException ioe) {
            throw new ReplicationException.UnavailableException("IOException while initializing Auditor", ioe);
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("KeeperException while initializing Auditor", ke);
        }
        catch (InterruptedException ie) {
            throw new ReplicationException.UnavailableException("Interrupted while initializing Auditor", ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitShutdownTask() {
        Auditor auditor = this;
        synchronized (auditor) {
            if (this.executor.isShutdown()) {
                return;
            }
            this.executor.submit(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Auditor auditor = Auditor.this;
                    synchronized (auditor) {
                        Auditor.this.executor.shutdown();
                    }
                }
            });
        }
    }

    @VisibleForTesting
    synchronized Future<?> submitAuditTask() {
        if (this.executor.isShutdown()) {
            SettableFuture f = SettableFuture.create();
            f.setException(new ReplicationException.BKAuditException("Auditor shutting down"));
            return f;
        }
        return this.executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    Auditor.this.waitIfLedgerReplicationDisabled();
                    List availableBookies = Auditor.this.getAvailableBookies();
                    Collection newBookies = CollectionUtils.subtract((Collection)availableBookies, (Collection)Auditor.this.knownBookies);
                    Auditor.this.knownBookies.addAll(newBookies);
                    Collection lostBookies = CollectionUtils.subtract((Collection)Auditor.this.knownBookies, (Collection)availableBookies);
                    if (lostBookies.size() > 0) {
                        Auditor.this.knownBookies.removeAll(lostBookies);
                        Auditor.this.auditBookies();
                    }
                }
                catch (BKException bke) {
                    LOG.error("Exception getting bookie list", (Throwable)bke);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    LOG.error("Interrupted while watching available bookies ", (Throwable)ie);
                }
                catch (ReplicationException.BKAuditException bke) {
                    LOG.error("Exception while watching available bookies", (Throwable)bke);
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Exception while watching available bookies", (Throwable)ue);
                }
                catch (KeeperException ke) {
                    LOG.error("Exception reading bookie list", (Throwable)ke);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        LOG.info("I'm starting as Auditor Bookie. ID: {}", (Object)this.bookieIdentifier);
        Auditor auditor = this;
        synchronized (auditor) {
            if (this.executor.isShutdown()) {
                return;
            }
            long interval = this.conf.getAuditorPeriodicCheckInterval();
            if (interval > 0L) {
                LOG.info("Auditor periodic ledger checking enabled 'auditorPeriodicCheckInterval' {} seconds", (Object)interval);
                this.executor.scheduleAtFixedRate(new Runnable(){

                    @Override
                    public void run() {
                        LOG.info("Running periodic check");
                        try {
                            if (!Auditor.this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                                LOG.info("Ledger replication disabled, skipping");
                                return;
                            }
                            Auditor.this.checkAllLedgers();
                        }
                        catch (KeeperException ke) {
                            LOG.error("Exception while running periodic check", (Throwable)ke);
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            LOG.error("Interrupted while running periodic check", (Throwable)ie);
                        }
                        catch (ReplicationException.BKAuditException bkae) {
                            LOG.error("Exception while running periodic check", (Throwable)bkae);
                        }
                        catch (BKException bke) {
                            LOG.error("Exception running periodic check", (Throwable)bke);
                        }
                        catch (IOException ioe) {
                            LOG.error("I/O exception running periodic check", (Throwable)ioe);
                        }
                        catch (ReplicationException.UnavailableException ue) {
                            LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                        }
                    }
                }, interval, interval, TimeUnit.SECONDS);
            } else {
                LOG.info("Periodic checking disabled");
            }
            try {
                this.knownBookies = this.getAvailableBookies();
            }
            catch (BKException bke) {
                LOG.error("Couldn't get bookie list, exiting", (Throwable)bke);
                this.submitShutdownTask();
            }
            long bookieCheckInterval = this.conf.getAuditorPeriodicBookieCheckInterval();
            if (bookieCheckInterval == 0L) {
                LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
                this.executor.submit(this.BOOKIE_CHECK);
            } else {
                LOG.info("Auditor periodic bookie checking enabled 'auditorPeriodicBookieCheckInterval' {} seconds", (Object)bookieCheckInterval);
                this.executor.scheduleAtFixedRate(this.BOOKIE_CHECK, 0L, bookieCheckInterval, TimeUnit.SECONDS);
            }
        }
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb cb = new ReplicationEnableCb();
        if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
            this.ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }

    private List<String> getAvailableBookies() throws BKException {
        this.admin.notifyBookiesChanged(this);
        Collection<InetSocketAddress> availableBkAddresses = this.admin.getAvailableBookies();
        Collection<InetSocketAddress> readOnlyBkAddresses = this.admin.getReadOnlyBookies();
        availableBkAddresses.addAll(readOnlyBkAddresses);
        ArrayList<String> availableBookies = new ArrayList<String>();
        for (InetSocketAddress addr : availableBkAddresses) {
            availableBookies.add(StringUtils.addrToString(addr));
        }
        return availableBookies;
    }

    private void auditBookies() throws ReplicationException.BKAuditException, KeeperException, InterruptedException, BKException {
        try {
            this.waitIfLedgerReplicationDisabled();
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            return;
        }
        Map<String, Set<Long>> ledgerDetails = this.generateBookie2LedgersIndex();
        try {
            if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                this.executor.submit(this.BOOKIE_CHECK);
                return;
            }
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            return;
        }
        List<String> availableBookies = this.getAvailableBookies();
        Set<String> knownBookies = ledgerDetails.keySet();
        Collection lostBookies = CollectionUtils.subtract(knownBookies, availableBookies);
        if (lostBookies.size() > 0) {
            this.handleLostBookies(lostBookies, ledgerDetails);
        }
    }

    private Map<String, Set<Long>> generateBookie2LedgersIndex() throws ReplicationException.BKAuditException {
        return this.bookieLedgerIndexer.getBookieToLedgerIndex();
    }

    private void handleLostBookies(Collection<String> lostBookies, Map<String, Set<Long>> ledgerDetails) throws ReplicationException.BKAuditException {
        LOG.info("Following are the failed bookies: " + lostBookies + " and searching its ledgers for re-replication");
        for (String bookieIP : lostBookies) {
            this.publishSuspectedLedgers(bookieIP, ledgerDetails.get(bookieIP));
        }
    }

    private void publishSuspectedLedgers(String bookieIP, Set<Long> ledgers) throws ReplicationException.BKAuditException {
        if (null == ledgers || ledgers.size() == 0) {
            LOG.info("There is no ledgers for the failed bookie: " + bookieIP);
            return;
        }
        LOG.info("Following ledgers: " + ledgers + " of bookie: " + bookieIP + " are identified as underreplicated");
        for (Long ledgerId : ledgers) {
            try {
                this.ledgerUnderreplicationManager.markLedgerUnderreplicated(ledgerId, bookieIP);
            }
            catch (ReplicationException.UnavailableException ue) {
                throw new ReplicationException.BKAuditException("Failed to publish underreplicated ledger: " + ledgerId + " of bookie: " + bookieIP, ue);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkAllLedgers() throws ReplicationException.BKAuditException, BKException, IOException, InterruptedException, KeeperException {
        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(this.conf.getZkTimeout());
        ZooKeeper newzk = ZkUtils.createConnectedZookeeperClient(this.conf.getZkServers(), w);
        BookKeeper client = new BookKeeper(new ClientConfiguration(this.conf), newzk);
        final BookKeeperAdmin admin = new BookKeeperAdmin(client);
        try {
            final LedgerChecker checker = new LedgerChecker(client);
            final AtomicInteger returnCode = new AtomicInteger(0);
            final CountDownLatch processDone = new CountDownLatch(1);
            BookkeeperInternalCallbacks.Processor<Long> checkLedgersProcessor = new BookkeeperInternalCallbacks.Processor<Long>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void process(Long ledgerId, AsyncCallback.VoidCallback callback) {
                    try {
                        if (!Auditor.this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                            LOG.info("Ledger rereplication has been disabled, aborting periodic check");
                            processDone.countDown();
                            return;
                        }
                    }
                    catch (ReplicationException.UnavailableException ue) {
                        LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                        processDone.countDown();
                        return;
                    }
                    LedgerHandle lh = null;
                    try {
                        lh = admin.openLedgerNoRecovery(ledgerId);
                        checker.checkLedger(lh, new ProcessLostFragmentsCb(lh, callback));
                    }
                    catch (BKException.BKNoSuchLedgerExistsException bknsle) {
                        LOG.debug("Ledger was deleted before we could check it", (Throwable)bknsle);
                        callback.processResult(0, null, null);
                        return;
                    }
                    catch (BKException bke) {
                        LOG.error("Couldn't open ledger " + ledgerId, (Throwable)bke);
                        callback.processResult(-8, null, null);
                        return;
                    }
                    catch (InterruptedException ie) {
                        LOG.error("Interrupted opening ledger", (Throwable)ie);
                        Thread.currentThread().interrupt();
                        callback.processResult(-15, null, null);
                        return;
                    }
                    finally {
                        if (lh != null) {
                            try {
                                lh.close();
                            }
                            catch (BKException bke) {
                                LOG.warn("Couldn't close ledger " + ledgerId, (Throwable)bke);
                            }
                            catch (InterruptedException ie) {
                                LOG.warn("Interrupted closing ledger " + ledgerId, (Throwable)ie);
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }
            };
            this.ledgerManager.asyncProcessLedgers(checkLedgersProcessor, new AsyncCallback.VoidCallback(){

                public void processResult(int rc, String s, Object obj) {
                    returnCode.set(rc);
                    processDone.countDown();
                }
            }, null, 0, -1);
            try {
                processDone.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.BKAuditException("Exception while checking ledgers", e);
            }
            if (returnCode.get() != 0) {
                throw BKException.create(returnCode.get());
            }
        }
        finally {
            admin.close();
            client.close();
            newzk.close();
        }
    }

    @Override
    public void availableBookiesChanged() {
        this.submitAuditTask();
    }

    public void shutdown() {
        LOG.info("Shutting down auditor");
        this.submitShutdownTask();
        try {
            while (!this.executor.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.warn("Executor not shutting down, interrupting");
                this.executor.shutdownNow();
            }
            this.admin.close();
            this.bkc.close();
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.warn("Interrupted while shutting down auditor bookie", (Throwable)ie);
        }
        catch (BKException bke) {
            LOG.warn("Exception while shutting down auditor bookie", (Throwable)bke);
        }
    }

    public boolean isRunning() {
        return !this.executor.isShutdown();
    }

    private class ProcessLostFragmentsCb
    implements BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>> {
        final LedgerHandle lh;
        final AsyncCallback.VoidCallback callback;

        ProcessLostFragmentsCb(LedgerHandle lh, AsyncCallback.VoidCallback callback) {
            this.lh = lh;
            this.callback = callback;
        }

        @Override
        public void operationComplete(int rc, Set<LedgerFragment> fragments) {
            block9: {
                try {
                    if (rc == 0) {
                        HashSet<InetSocketAddress> bookies = Sets.newHashSet();
                        for (LedgerFragment f : fragments) {
                            bookies.add(f.getAddress());
                        }
                        for (InetSocketAddress bookie : bookies) {
                            Auditor.this.publishSuspectedLedgers(StringUtils.addrToString(bookie), Sets.newHashSet(this.lh.getId()));
                        }
                    }
                    this.lh.close();
                }
                catch (BKException bke) {
                    LOG.error("Error closing lh", (Throwable)bke);
                    if (rc == 0) {
                        rc = -200;
                    }
                }
                catch (InterruptedException ie) {
                    LOG.error("Interrupted publishing suspected ledger", (Throwable)ie);
                    Thread.currentThread().interrupt();
                    if (rc == 0) {
                        rc = -15;
                    }
                }
                catch (ReplicationException.BKAuditException bkae) {
                    LOG.error("Auditor exception publishing suspected ledger", (Throwable)bkae);
                    if (rc != 0) break block9;
                    rc = -200;
                }
            }
            this.callback.processResult(rc, null, null);
        }
    }
}

