package org.apache.flume.channel.file;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Future;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flume/channel/file/FlumeEventQueue.class */
public final class FlumeEventQueue {
    private static final Logger LOG = LoggerFactory.getLogger(FlumeEventQueue.class);
    private static final int EMPTY = 0;
    private final EventQueueBackingStore backingStore;
    private final String channelNameDescriptor;
    private final InflightEventWrapper inflightTakes;
    private final InflightEventWrapper inflightPuts;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flume/channel/file/FlumeEventQueue$InflightEventWrapper.class */
    public class InflightEventWrapper {
        private volatile RandomAccessFile file;
        private volatile java.nio.channels.FileChannel fileChannel;
        private final MessageDigest digest;
        private volatile Future<?> future;
        private final File inflightEventsFile;
        private SetMultimap<Long, Long> inflightEvents = HashMultimap.create();
        private volatile boolean syncRequired = false;
        private SetMultimap<Long, Integer> inflightFileIDs = HashMultimap.create();

        public InflightEventWrapper(File file) throws Exception {
            if (!file.exists()) {
                Preconditions.checkState(file.createNewFile(), "Could notcreate inflight events file: " + file.getCanonicalPath());
            }
            this.inflightEventsFile = file;
            this.file = new RandomAccessFile(file, "rw");
            this.fileChannel = this.file.getChannel();
            this.digest = MessageDigest.getInstance("MD5");
        }

        public boolean completeTransaction(Long l) {
            if (!this.inflightEvents.containsKey(l)) {
                return false;
            }
            this.inflightEvents.removeAll(l);
            this.inflightFileIDs.removeAll(l);
            this.syncRequired = true;
            return true;
        }

        public void addEvent(Long l, Long l2) {
            this.inflightEvents.put(l, l2);
            this.inflightFileIDs.put(l, Integer.valueOf(FlumeEventPointer.fromLong(l2.longValue()).getFileID()));
            this.syncRequired = true;
        }

        public void serializeAndWrite() throws Exception {
            Collection values = this.inflightEvents.values();
            if (!this.fileChannel.isOpen()) {
                this.file = new RandomAccessFile(this.inflightEventsFile, "rw");
                this.fileChannel = this.file.getChannel();
            }
            if (values.isEmpty()) {
                this.file.setLength(0L);
            }
            try {
                int size = (((this.inflightEvents.keySet().size() * 2) + values.size()) * 8) + 16;
                this.file.setLength(size);
                Preconditions.checkState(this.file.length() == ((long) size), "Expected File size of inflight events file does not match the current file size. Checkpoint is incomplete.");
                this.file.seek(0L);
                ByteBuffer allocate = ByteBuffer.allocate(size);
                LongBuffer asLongBuffer = allocate.asLongBuffer();
                for (Long l : this.inflightEvents.keySet()) {
                    Set set = this.inflightEvents.get(l);
                    asLongBuffer.put(l.longValue());
                    asLongBuffer.put(set.size());
                    FlumeEventQueue.LOG.debug("Number of events inserted into inflights file: " + String.valueOf(set.size()) + " file: " + this.inflightEventsFile.getCanonicalPath());
                    asLongBuffer.put(ArrayUtils.toPrimitive((Long[]) set.toArray(new Long[0])));
                }
                this.file.write(this.digest.digest(allocate.array()));
                allocate.position(0);
                this.fileChannel.write(allocate);
                this.fileChannel.force(true);
                this.syncRequired = false;
            } catch (IOException e) {
                FlumeEventQueue.LOG.error("Error while writing checkpoint to disk.", e);
                throw e;
            }
        }

        public SetMultimap<Long, Long> deserialize() throws IOException, BadCheckpointException {
            HashMultimap create = HashMultimap.create();
            if (!this.fileChannel.isOpen()) {
                this.file = new RandomAccessFile(this.inflightEventsFile, "rw");
                this.fileChannel = this.file.getChannel();
            }
            if (this.file.length() == 0) {
                return create;
            }
            this.file.seek(0L);
            byte[] bArr = new byte[16];
            this.file.read(bArr);
            ByteBuffer allocate = ByteBuffer.allocate((int) (this.file.length() - this.file.getFilePointer()));
            this.fileChannel.read(allocate);
            if (!Arrays.equals(bArr, this.digest.digest(allocate.array()))) {
                throw new BadCheckpointException("Checksum of inflights file differs from the checksum expected.");
            }
            allocate.position(0);
            LongBuffer asLongBuffer = allocate.asLongBuffer();
            while (true) {
                try {
                    long j = asLongBuffer.get();
                    int i = (int) asLongBuffer.get();
                    for (int i2 = 0; i2 < i; i2++) {
                        create.put(Long.valueOf(j), Long.valueOf(asLongBuffer.get()));
                    }
                } catch (BufferUnderflowException e) {
                    FlumeEventQueue.LOG.debug("Reached end of inflights buffer. Long buffer position =" + String.valueOf(asLongBuffer.position()));
                    return create;
                }
            }
        }

        public int getSize() {
            return this.inflightEvents.size();
        }

        public boolean syncRequired() {
            return this.syncRequired;
        }

        public Collection<Integer> getFileIDs() {
            return this.inflightFileIDs.values();
        }

        public Collection<Long> getInFlightPointers() {
            return this.inflightEvents.values();
        }

        public void close() throws IOException {
            this.file.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlumeEventQueue(EventQueueBackingStore eventQueueBackingStore, File file, File file2) throws Exception {
        Preconditions.checkArgument(eventQueueBackingStore.getCapacity() > 0, "Capacity must be greater than zero");
        this.channelNameDescriptor = "[channel=" + eventQueueBackingStore.getName() + "]";
        this.backingStore = eventQueueBackingStore;
        try {
            this.inflightPuts = new InflightEventWrapper(file2);
            this.inflightTakes = new InflightEventWrapper(file);
        } catch (Exception e) {
            LOG.error("Could not read checkpoint.", e);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SetMultimap<Long, Long> deserializeInflightPuts() throws IOException, BadCheckpointException {
        return this.inflightPuts.deserialize();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SetMultimap<Long, Long> deserializeInflightTakes() throws IOException, BadCheckpointException {
        return this.inflightTakes.deserialize();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized long getLogWriteOrderID() {
        return this.backingStore.getLogWriteOrderID();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean checkpoint(boolean z) throws Exception {
        if (!this.backingStore.syncRequired() && !this.inflightTakes.syncRequired() && !z) {
            LOG.debug("Checkpoint not required");
            return false;
        }
        this.backingStore.beginCheckpoint();
        this.inflightPuts.serializeAndWrite();
        this.inflightTakes.serializeAndWrite();
        this.backingStore.checkpoint();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized FlumeEventPointer removeHead(long j) {
        if (this.backingStore.getSize() == 0) {
            return null;
        }
        long remove = remove(0, j);
        Preconditions.checkState(remove != 0, "Empty value " + this.channelNameDescriptor);
        FlumeEventPointer fromLong = FlumeEventPointer.fromLong(remove);
        this.backingStore.decrementFileID(fromLong.getFileID());
        return fromLong;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean addHead(FlumeEventPointer flumeEventPointer) {
        if (this.backingStore.getSize() == this.backingStore.getCapacity()) {
            LOG.error("Could not reinsert to queue, events which were taken but not committed. Please report this issue.");
            return false;
        }
        long j = flumeEventPointer.toLong();
        Preconditions.checkArgument(j != 0);
        this.backingStore.incrementFileID(flumeEventPointer.getFileID());
        add(0, j);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean addTail(FlumeEventPointer flumeEventPointer) {
        if (getSize() == this.backingStore.getCapacity()) {
            return false;
        }
        long j = flumeEventPointer.toLong();
        Preconditions.checkArgument(j != 0);
        this.backingStore.incrementFileID(flumeEventPointer.getFileID());
        add(this.backingStore.getSize(), j);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addWithoutCommit(FlumeEventPointer flumeEventPointer, long j) {
        this.inflightPuts.addEvent(Long.valueOf(j), Long.valueOf(flumeEventPointer.toLong()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean remove(FlumeEventPointer flumeEventPointer) {
        long j = flumeEventPointer.toLong();
        Preconditions.checkArgument(j != 0);
        for (int i = 0; i < this.backingStore.getSize(); i++) {
            if (get(i) == j) {
                remove(i, 0L);
                this.backingStore.decrementFileID(FlumeEventPointer.fromLong(j).getFileID());
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized SortedSet<Integer> getFileIDs() {
        TreeSet treeSet = new TreeSet((SortedSet) this.backingStore.getReferenceCounts());
        treeSet.addAll(this.inflightPuts.getFileIDs());
        treeSet.addAll(this.inflightTakes.getFileIDs());
        return treeSet;
    }

    protected long get(int i) {
        if (i < 0 || i > this.backingStore.getSize() - 1) {
            throw new IndexOutOfBoundsException(String.valueOf(i) + this.channelNameDescriptor);
        }
        return this.backingStore.get(i);
    }

    private void set(int i, long j) {
        if (i < 0 || i > this.backingStore.getSize() - 1) {
            throw new IndexOutOfBoundsException(String.valueOf(i) + this.channelNameDescriptor);
        }
        this.backingStore.put(i, j);
    }

    protected boolean add(int i, long j) {
        if (i < 0 || i > this.backingStore.getSize()) {
            throw new IndexOutOfBoundsException(String.valueOf(i) + this.channelNameDescriptor);
        }
        if (this.backingStore.getSize() == this.backingStore.getCapacity()) {
            return false;
        }
        this.backingStore.setSize(this.backingStore.getSize() + 1);
        if (i <= this.backingStore.getSize() / 2) {
            this.backingStore.setHead(this.backingStore.getHead() - 1);
            if (this.backingStore.getHead() < 0) {
                this.backingStore.setHead(this.backingStore.getCapacity() - 1);
            }
            for (int i2 = 0; i2 < i; i2++) {
                set(i2, get(i2 + 1));
            }
        } else {
            for (int size = this.backingStore.getSize() - 1; size > i; size--) {
                set(size, get(size - 1));
            }
        }
        set(i, j);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void completeTransaction(long j) {
        if (this.inflightPuts.completeTransaction(Long.valueOf(j))) {
            return;
        }
        this.inflightTakes.completeTransaction(Long.valueOf(j));
    }

    protected synchronized long remove(int i, long j) {
        if (i < 0 || i > this.backingStore.getSize() - 1) {
            throw new IndexOutOfBoundsException("index = " + i + ", queueSize " + this.backingStore.getSize() + " " + this.channelNameDescriptor);
        }
        long j2 = get(i);
        if (j != 0) {
            this.inflightTakes.addEvent(Long.valueOf(j), Long.valueOf(j2));
        }
        if (i > this.backingStore.getSize() / 2) {
            for (int i2 = i; i2 < this.backingStore.getSize() - 1; i2++) {
                set(i2, get(i2 + 1));
            }
            set(this.backingStore.getSize() - 1, 0L);
        } else {
            for (int i3 = i - 1; i3 >= 0; i3--) {
                set(i3 + 1, get(i3));
            }
            set(0, 0L);
            this.backingStore.setHead(this.backingStore.getHead() + 1);
            if (this.backingStore.getHead() == this.backingStore.getCapacity()) {
                this.backingStore.setHead(0);
            }
        }
        this.backingStore.setSize(this.backingStore.getSize() - 1);
        return j2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized int getSize() {
        return this.backingStore.getSize() + this.inflightTakes.getSize();
    }

    public int getCapacity() {
        return this.backingStore.getCapacity();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void close() throws IOException {
        try {
            this.backingStore.close();
            this.inflightPuts.close();
            this.inflightTakes.close();
        } catch (IOException e) {
            LOG.warn("Error closing backing store", e);
        }
    }
}
