/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.contrib.bkjournal;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.io.DataOutputBuffer;

class BookKeeperEditLogOutputStream
extends EditLogOutputStream
implements AsyncCallback.AddCallback {
    static final Log LOG = LogFactory.getLog(BookKeeperEditLogOutputStream.class);
    private final DataOutputBuffer bufCurrent;
    private final AtomicInteger outstandingRequests;
    private final int transmissionThreshold;
    private final LedgerHandle lh;
    private CountDownLatch syncLatch = null;
    private final AtomicInteger transmitResult = new AtomicInteger(0);
    private final FSEditLogOp.Writer writer;

    protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh) throws IOException {
        this.bufCurrent = new DataOutputBuffer();
        this.outstandingRequests = new AtomicInteger(0);
        this.lh = lh;
        this.writer = new FSEditLogOp.Writer(this.bufCurrent);
        this.transmissionThreshold = conf.getInt("dfs.namenode.bookkeeperjournal.output-buffer-size", 1024);
    }

    public void create(int layoutVersion) throws IOException {
    }

    public void close() throws IOException {
        this.setReadyToFlush();
        this.flushAndSync(true);
        try {
            this.lh.close();
        }
        catch (InterruptedException ie) {
            throw new IOException("Interrupted waiting on close", ie);
        }
        catch (BKException bke) {
            throw new IOException("BookKeeper error during close", bke);
        }
    }

    public void abort() throws IOException {
        try {
            this.lh.close();
        }
        catch (InterruptedException ie) {
            throw new IOException("Interrupted waiting on close", ie);
        }
        catch (BKException bke) {
            throw new IOException("BookKeeper error during abort", bke);
        }
    }

    public void writeRaw(byte[] data, int off, int len) throws IOException {
        throw new IOException("Not supported for BK");
    }

    public void write(FSEditLogOp op) throws IOException {
        this.writer.writeOp(op);
        if (this.bufCurrent.getLength() > this.transmissionThreshold) {
            this.transmit();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setReadyToFlush() throws IOException {
        this.transmit();
        BookKeeperEditLogOutputStream bookKeeperEditLogOutputStream = this;
        synchronized (bookKeeperEditLogOutputStream) {
            this.syncLatch = new CountDownLatch(this.outstandingRequests.get());
        }
    }

    public void flushAndSync(boolean durable) throws IOException {
        assert (this.syncLatch != null);
        try {
            this.syncLatch.await();
        }
        catch (InterruptedException ie) {
            throw new IOException("Interrupted waiting on latch", ie);
        }
        if (this.transmitResult.get() != 0) {
            throw new IOException("Failed to write to bookkeeper; Error is (" + this.transmitResult.get() + ") " + BKException.getMessage((int)this.transmitResult.get()));
        }
        this.syncLatch = null;
    }

    private void transmit() throws IOException {
        if (!this.transmitResult.compareAndSet(0, 0)) {
            throw new IOException("Trying to write to an errored stream; Error code : (" + this.transmitResult.get() + ") " + BKException.getMessage((int)this.transmitResult.get()));
        }
        if (this.bufCurrent.getLength() > 0) {
            byte[] entry = Arrays.copyOf(this.bufCurrent.getData(), this.bufCurrent.getLength());
            this.lh.asyncAddEntry(entry, (AsyncCallback.AddCallback)this, null);
            this.bufCurrent.reset();
            this.outstandingRequests.incrementAndGet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addComplete(int rc, LedgerHandle handle, long entryId, Object ctx) {
        BookKeeperEditLogOutputStream bookKeeperEditLogOutputStream = this;
        synchronized (bookKeeperEditLogOutputStream) {
            CountDownLatch l;
            this.outstandingRequests.decrementAndGet();
            if (!this.transmitResult.compareAndSet(0, rc)) {
                LOG.warn((Object)("Tried to set transmit result to (" + rc + ") \"" + BKException.getMessage((int)rc) + "\" but is already (" + this.transmitResult.get() + ") \"" + BKException.getMessage((int)this.transmitResult.get()) + "\""));
            }
            if ((l = this.syncLatch) != null) {
                l.countDown();
            }
        }
    }
}

