/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PendingReadOp
implements Enumeration<LedgerEntry>,
BookkeeperInternalCallbacks.ReadEntryCallback {
    Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
    final int speculativeReadTimeout;
    private final ScheduledExecutorService scheduler;
    private ScheduledFuture<?> speculativeTask = null;
    Queue<LedgerEntryRequest> seq;
    Set<InetSocketAddress> heardFromHosts;
    AsyncCallback.ReadCallback cb;
    Object ctx;
    LedgerHandle lh;
    long numPendingEntries;
    long startEntryId;
    long endEntryId;
    final int maxMissedReadsAllowed;

    PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler, long startEntryId, long endEntryId, AsyncCallback.ReadCallback cb, Object ctx) {
        this.seq = new ArrayBlockingQueue<LedgerEntryRequest>((int)(endEntryId + 1L - startEntryId));
        this.cb = cb;
        this.ctx = ctx;
        this.lh = lh;
        this.startEntryId = startEntryId;
        this.endEntryId = endEntryId;
        this.scheduler = scheduler;
        this.numPendingEntries = endEntryId - startEntryId + 1L;
        this.maxMissedReadsAllowed = this.getLedgerMetadata().getWriteQuorumSize() - this.getLedgerMetadata().getAckQuorumSize();
        this.speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
        this.heardFromHosts = new HashSet<InetSocketAddress>();
    }

    protected LedgerMetadata getLedgerMetadata() {
        return this.lh.metadata;
    }

    public void initiate() throws InterruptedException {
        long nextEnsembleChange = this.startEntryId;
        long i = this.startEntryId;
        ArrayList<InetSocketAddress> ensemble = null;
        if (this.speculativeReadTimeout > 0) {
            this.speculativeTask = this.scheduler.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    int x = 0;
                    for (LedgerEntryRequest r : PendingReadOp.this.seq) {
                        if (r.isComplete() || null == r.maybeSendSpeculativeRead(PendingReadOp.this.heardFromHosts)) continue;
                        PendingReadOp.this.LOG.debug("Send speculative read for {}. Hosts heard are {}.", (Object)r, PendingReadOp.this.heardFromHosts);
                        ++x;
                    }
                    if (x > 0) {
                        PendingReadOp.this.LOG.debug("Send {} speculative reads for ledger {} ({}, {}). Hosts heard are {}.", new Object[]{x, PendingReadOp.this.lh.getId(), PendingReadOp.this.startEntryId, PendingReadOp.this.endEntryId, PendingReadOp.this.heardFromHosts});
                    }
                }
            }, this.speculativeReadTimeout, this.speculativeReadTimeout, TimeUnit.MILLISECONDS);
        }
        do {
            if (i == nextEnsembleChange) {
                ensemble = this.getLedgerMetadata().getEnsemble(i);
                nextEnsembleChange = this.getLedgerMetadata().getNextEnsembleChange(i);
            }
            LedgerEntryRequest entry = new LedgerEntryRequest(ensemble, this.lh.ledgerId, i);
            this.seq.add(entry);
            entry.sendNextRead();
        } while (++i <= this.endEntryId);
    }

    void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
        this.lh.throttler.acquire();
        this.lh.bk.bookieClient.readEntry(to, this.lh.ledgerId, entry.entryId, this, new ReadContext(to, entry));
    }

    @Override
    public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) {
        ReadContext rctx = (ReadContext)ctx;
        LedgerEntryRequest entry = rctx.entry;
        if (rc != 0) {
            entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc);
            return;
        }
        this.heardFromHosts.add(rctx.to);
        if (entry.complete(rctx.to, buffer)) {
            --this.numPendingEntries;
            if (this.numPendingEntries == 0L) {
                this.submitCallback(0);
            }
        }
        if (this.numPendingEntries < 0L) {
            this.LOG.error("Read too many values");
        }
    }

    private void submitCallback(int code) {
        if (this.speculativeTask != null) {
            this.speculativeTask.cancel(true);
            this.speculativeTask = null;
        }
        if (code != 0) {
            long firstUnread = -1L;
            for (LedgerEntryRequest req : this.seq) {
                if (req.isComplete()) continue;
                firstUnread = req.getEntryId();
                break;
            }
            this.LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {}. First unread entry is {}", new Object[]{this.lh.getId(), this.startEntryId, this.endEntryId, this.heardFromHosts, firstUnread});
        }
        this.cb.readComplete(code, this.lh, this, this.ctx);
    }

    @Override
    public boolean hasMoreElements() {
        return !this.seq.isEmpty();
    }

    @Override
    public LedgerEntry nextElement() throws NoSuchElementException {
        return this.seq.remove();
    }

    public int size() {
        return this.seq.size();
    }

    private static class ReadContext {
        final InetSocketAddress to;
        final LedgerEntryRequest entry;

        ReadContext(InetSocketAddress to, LedgerEntryRequest entry) {
            this.to = to;
            this.entry = entry;
        }
    }

    class LedgerEntryRequest
    extends LedgerEntry {
        static final int NOT_FOUND = -1;
        int nextReplicaIndexToReadFrom;
        AtomicBoolean complete;
        int firstError;
        int numMissedEntryReads;
        final ArrayList<InetSocketAddress> ensemble;
        final List<Integer> writeSet;
        final BitSet sentReplicas;
        final BitSet erroredReplicas;

        LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId, long eId) {
            super(lId, eId);
            this.nextReplicaIndexToReadFrom = 0;
            this.complete = new AtomicBoolean(false);
            this.firstError = 0;
            this.numMissedEntryReads = 0;
            this.ensemble = ensemble;
            this.writeSet = PendingReadOp.this.lh.distributionSchedule.getWriteSet(this.entryId);
            this.sentReplicas = new BitSet(PendingReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
            this.erroredReplicas = new BitSet(PendingReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
        }

        private int getReplicaIndex(InetSocketAddress host) {
            int bookieIndex = this.ensemble.indexOf(host);
            if (bookieIndex == -1) {
                return -1;
            }
            return this.writeSet.indexOf(bookieIndex);
        }

        private BitSet getSentToBitSet() {
            BitSet b = new BitSet(this.ensemble.size());
            for (int i = 0; i < this.sentReplicas.length(); ++i) {
                if (!this.sentReplicas.get(i)) continue;
                b.set(this.writeSet.get(i));
            }
            return b;
        }

        private BitSet getHeardFromBitSet(Set<InetSocketAddress> heardFromHosts) {
            BitSet b = new BitSet(this.ensemble.size());
            for (InetSocketAddress i : heardFromHosts) {
                int index = this.ensemble.indexOf(i);
                if (index == -1) continue;
                b.set(index);
            }
            return b;
        }

        private boolean readsOutstanding() {
            return this.sentReplicas.cardinality() - this.erroredReplicas.cardinality() > 0;
        }

        synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
            if (this.nextReplicaIndexToReadFrom >= PendingReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                return null;
            }
            BitSet sentTo = this.getSentToBitSet();
            BitSet heardFrom = this.getHeardFromBitSet(heardFromHosts);
            sentTo.and(heardFrom);
            if (sentTo.cardinality() == 0) {
                return this.sendNextRead();
            }
            return null;
        }

        synchronized InetSocketAddress sendNextRead() {
            if (this.nextReplicaIndexToReadFrom >= PendingReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                if (-8 == this.firstError && this.numMissedEntryReads > PendingReadOp.this.maxMissedReadsAllowed) {
                    this.firstError = -13;
                }
                PendingReadOp.this.submitCallback(this.firstError);
                return null;
            }
            int replica = this.nextReplicaIndexToReadFrom;
            int bookieIndex = PendingReadOp.this.lh.distributionSchedule.getWriteSet(this.entryId).get(this.nextReplicaIndexToReadFrom);
            ++this.nextReplicaIndexToReadFrom;
            try {
                InetSocketAddress to = this.ensemble.get(bookieIndex);
                PendingReadOp.this.sendReadTo(to, this);
                this.sentReplicas.set(replica);
                return to;
            }
            catch (InterruptedException ie) {
                this.LOG.error("Interrupted reading entry " + this, (Throwable)ie);
                Thread.currentThread().interrupt();
                PendingReadOp.this.submitCallback(-1);
                return null;
            }
        }

        synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) {
            if (0 == this.firstError || -13 == this.firstError) {
                this.firstError = rc;
            } else if (-8 == this.firstError && -13 != rc) {
                this.firstError = rc;
            }
            if (-13 == rc) {
                ++this.numMissedEntryReads;
                this.LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}", new Object[]{PendingReadOp.this.lh.ledgerId, this.entryId, host});
            } else {
                this.LOG.debug(errMsg + " while reading L{} E{} from bookie: {}", new Object[]{PendingReadOp.this.lh.ledgerId, this.entryId, host});
            }
            int replica = this.getReplicaIndex(host);
            if (replica == -1) {
                this.LOG.error("Received error from a host which is not in the ensemble {} {}.", (Object)host, this.ensemble);
                return;
            }
            this.erroredReplicas.set(replica);
            if (!this.readsOutstanding()) {
                this.sendNextRead();
            }
        }

        boolean complete(InetSocketAddress host, ChannelBuffer buffer) {
            ChannelBufferInputStream is;
            try {
                is = PendingReadOp.this.lh.macManager.verifyDigestAndReturnData(this.entryId, buffer);
            }
            catch (BKException.BKDigestMatchException e) {
                this.logErrorAndReattemptRead(host, "Mac mismatch", -5);
                return false;
            }
            if (!this.complete.getAndSet(true)) {
                this.entryDataStream = is;
                this.length = buffer.getLong(24);
                return true;
            }
            return false;
        }

        boolean isComplete() {
            return this.complete.get();
        }

        public String toString() {
            return String.format("L%d-E%d", this.ledgerId, this.entryId);
        }
    }
}

