/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.DigestMismatchException;
import org.apache.cassandra.service.ReadResponseResolver;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsistencyChecker
implements Runnable {
    private static Logger logger_ = LoggerFactory.getLogger(ConsistencyChecker.class);
    private static ScheduledExecutorService executor_ = new ScheduledThreadPoolExecutor(1);
    private final Row row_;
    protected final List<InetAddress> replicas_;
    private final ReadCommand readCommand_;
    private final InetAddress dataSource;

    public ConsistencyChecker(ReadCommand command, Row row, List<InetAddress> endpoints, InetAddress dataSource) {
        this.row_ = row;
        this.replicas_ = endpoints;
        this.readCommand_ = command;
        this.dataSource = dataSource;
    }

    @Override
    public void run() {
        ReadCommand readCommandDigestOnly = this.constructReadMessage(true);
        try {
            Message message = readCommandDigestOnly.makeReadMessage();
            if (logger_.isDebugEnabled()) {
                logger_.debug("Reading consistency digest for " + this.readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(this.replicas_, ", ") + "]");
            }
            MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId());
            for (InetAddress endpoint : this.replicas_) {
                if (endpoint.equals(this.dataSource)) continue;
                MessagingService.instance.sendOneWay(message, endpoint);
            }
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    private ReadCommand constructReadMessage(boolean isDigestQuery) {
        ReadCommand readCommand = this.readCommand_.copy();
        readCommand.setDigestQuery(isDigestQuery);
        return readCommand;
    }

    class DataRepairHandler
    implements IAsyncCallback {
        private final ReadResponseResolver readResponseResolver_;
        private final int majority_;

        public DataRepairHandler() throws IOException {
            this.readResponseResolver_ = new ReadResponseResolver(((ConsistencyChecker)ConsistencyChecker.this).readCommand_.table, ((ConsistencyChecker)ConsistencyChecker.this).readCommand_.key);
            this.majority_ = ConsistencyChecker.this.replicas_.size() / 2 + 1;
            ReadResponse readResponse = new ReadResponse(ConsistencyChecker.this.row_);
            Message fakeMessage = new Message(ConsistencyChecker.this.dataSource, StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
            this.readResponseResolver_.injectPreProcessed(fakeMessage, readResponse);
        }

        @Override
        public synchronized void response(Message message) {
            if (logger_.isDebugEnabled()) {
                logger_.debug("Received response in DataRepairHandler : " + message.toString());
            }
            this.readResponseResolver_.preprocess(message);
            if (this.readResponseResolver_.getMessageCount() == this.majority_) {
                WrappedRunnable runnable = new WrappedRunnable(){

                    @Override
                    public void runMayThrow() throws IOException, DigestMismatchException {
                        DataRepairHandler.this.readResponseResolver_.resolve();
                    }
                };
                executor_.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
            }
        }
    }

    class DigestResponseHandler
    implements IAsyncCallback {
        private boolean repairInvoked;
        private final ByteBuffer localDigest;

        DigestResponseHandler() {
            this.localDigest = ColumnFamily.digest(((ConsistencyChecker)ConsistencyChecker.this).row_.cf);
        }

        @Override
        public synchronized void response(Message response) {
            if (this.repairInvoked) {
                return;
            }
            try {
                byte[] body = response.getMessageBody();
                ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
                ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
                ByteBuffer digest = result.digest();
                if (!this.localDigest.equals(digest)) {
                    ReadCommand readCommand = ConsistencyChecker.this.constructReadMessage(false);
                    Message message = readCommand.makeReadMessage();
                    if (logger_.isDebugEnabled()) {
                        logger_.debug("Digest mismatch; re-reading " + ((ConsistencyChecker)ConsistencyChecker.this).readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(ConsistencyChecker.this.replicas_, ", ") + "]");
                    }
                    MessagingService.instance.addCallback(new DataRepairHandler(), message.getMessageId());
                    for (InetAddress endpoint : ConsistencyChecker.this.replicas_) {
                        if (endpoint.equals(ConsistencyChecker.this.dataSource)) continue;
                        MessagingService.instance.sendOneWay(message, endpoint);
                    }
                    this.repairInvoked = true;
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error handling responses for " + ConsistencyChecker.this.row_, e);
            }
        }
    }
}

