/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.contrib.bkjournal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TextFormat;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.contrib.bkjournal.BKJournalProtos;
import org.apache.hadoop.contrib.bkjournal.BookKeeperEditLogInputStream;
import org.apache.hadoop.contrib.bkjournal.BookKeeperEditLogOutputStream;
import org.apache.hadoop.contrib.bkjournal.CurrentInprogress;
import org.apache.hadoop.contrib.bkjournal.EditLogLedgerMetadata;
import org.apache.hadoop.contrib.bkjournal.MaxTxId;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class BookKeeperJournalManager
implements JournalManager {
    static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
    public static final String BKJM_OUTPUT_BUFFER_SIZE = "dfs.namenode.bookkeeperjournal.output-buffer-size";
    public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
    public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE = "dfs.namenode.bookkeeperjournal.ensemble-size";
    public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
    public static final String BKJM_BOOKKEEPER_QUORUM_SIZE = "dfs.namenode.bookkeeperjournal.quorum-size";
    public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
    public static final String BKJM_BOOKKEEPER_DIGEST_PW = "dfs.namenode.bookkeeperjournal.digestPw";
    public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
    private static final int BKJM_LAYOUT_VERSION = -1;
    public static final String BKJM_ZK_SESSION_TIMEOUT = "dfs.namenode.bookkeeperjournal.zk.session.timeout";
    public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000;
    private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
    public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH = "dfs.namenode.bookkeeperjournal.zk.availablebookies";
    public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT = "/ledgers/available";
    public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs";
    public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT = 2000;
    public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
    public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
    public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE = "dfs.namenode.bookkeeperjournal.ack.quorum-size";
    public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC = "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec";
    public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5;
    private ZooKeeper zkc;
    private final Configuration conf;
    private final BookKeeper bkc;
    private final CurrentInprogress ci;
    private final String basePath;
    private final String ledgerPath;
    private final String versionPath;
    private final MaxTxId maxTxId;
    private final int ensembleSize;
    private final int quorumSize;
    private final int ackQuorumSize;
    private final int addEntryTimeout;
    private final String digestpw;
    private final int speculativeReadTimeout;
    private final int readEntryTimeout;
    private final CountDownLatch zkConnectLatch;
    private final NamespaceInfo nsInfo;
    private boolean initialized = false;
    private LedgerHandle currentLedger = null;

    public BookKeeperJournalManager(Configuration conf, URI uri, NamespaceInfo nsInfo) throws IOException {
        this.conf = conf;
        this.nsInfo = nsInfo;
        String zkConnect = uri.getAuthority().replace(";", ",");
        this.basePath = uri.getPath();
        this.ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE, 3);
        this.quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE, 2);
        this.ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, this.quorumSize);
        this.addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC, 5);
        this.speculativeReadTimeout = conf.getInt(BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS, 2000);
        this.readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 5);
        this.ledgerPath = this.basePath + "/ledgers";
        String maxTxIdPath = this.basePath + "/maxtxid";
        String currentInprogressNodePath = this.basePath + "/CurrentInprogress";
        this.versionPath = this.basePath + "/version";
        this.digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW, BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
        try {
            this.zkConnectLatch = new CountDownLatch(1);
            int bkjmZKSessionTimeout = conf.getInt(BKJM_ZK_SESSION_TIMEOUT, 3000);
            this.zkc = new ZooKeeper(zkConnect, bkjmZKSessionTimeout, (Watcher)new ZkConnectionWatcher());
            int zkConnectionLatchTimeout = bkjmZKSessionTimeout + 3000;
            if (!this.zkConnectLatch.await(zkConnectionLatchTimeout, TimeUnit.MILLISECONDS)) {
                throw new IOException("Error connecting to zookeeper");
            }
            this.prepareBookKeeperEnv();
            ClientConfiguration clientConf = new ClientConfiguration();
            clientConf.setSpeculativeReadTimeout(this.speculativeReadTimeout);
            clientConf.setReadEntryTimeout(this.readEntryTimeout);
            clientConf.setAddEntryTimeout(this.addEntryTimeout);
            this.bkc = new BookKeeper(clientConf, this.zkc);
        }
        catch (KeeperException e) {
            throw new IOException("Error initializing zk", e);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while initializing bk journal manager", ie);
        }
        this.ci = new CurrentInprogress(this.zkc, currentInprogressNodePath);
        this.maxTxId = new MaxTxId(this.zkc, maxTxIdPath);
    }

    private void prepareBookKeeperEnv() throws IOException {
        final String zkAvailablePath = this.conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH, BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
        final CountDownLatch zkPathLatch = new CountDownLatch(1);
        final AtomicBoolean success = new AtomicBoolean(false);
        AsyncCallback.StringCallback callback = new AsyncCallback.StringCallback(){

            public void processResult(int rc, String path, Object ctx, String name) {
                if (KeeperException.Code.OK.intValue() == rc || KeeperException.Code.NODEEXISTS.intValue() == rc) {
                    LOG.info((Object)("Successfully created bookie available path : " + zkAvailablePath));
                    success.set(true);
                } else {
                    KeeperException.Code code = KeeperException.Code.get((int)rc);
                    LOG.error((Object)("Error : " + KeeperException.create((KeeperException.Code)code, (String)path).getMessage() + ", failed to create bookie available path : " + zkAvailablePath));
                }
                zkPathLatch.countDown();
            }
        };
        ZkUtils.asyncCreateFullPathOptimistic((ZooKeeper)this.zkc, (String)zkAvailablePath, (byte[])new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT, (AsyncCallback.StringCallback)callback, null);
        try {
            if (!zkPathLatch.await(this.zkc.getSessionTimeout(), TimeUnit.MILLISECONDS) || !success.get()) {
                throw new IOException("Couldn't create bookie available path :" + zkAvailablePath + ", timed out " + this.zkc.getSessionTimeout() + " millis");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted when creating the bookie available path : " + zkAvailablePath, e);
        }
    }

    public void format(NamespaceInfo ns) throws IOException {
        try {
            Stat baseStat = null;
            Stat ledgerStat = null;
            baseStat = this.zkc.exists(this.basePath, false);
            if (baseStat != null) {
                ledgerStat = this.zkc.exists(this.ledgerPath, false);
                if (ledgerStat != null) {
                    for (EditLogLedgerMetadata l : this.getLedgerList(true)) {
                        try {
                            this.bkc.deleteLedger(l.getLedgerId());
                        }
                        catch (BKException.BKNoSuchLedgerExistsException bke) {
                            LOG.warn((Object)("Ledger " + l.getLedgerId() + " does not exist;" + " Cannot delete."));
                        }
                    }
                }
                ZKUtil.deleteRecursive((ZooKeeper)this.zkc, (String)this.basePath);
            }
            this.zkc.create(this.basePath, new byte[]{48}, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            BKJournalProtos.VersionProto.Builder builder = BKJournalProtos.VersionProto.newBuilder();
            builder.setNamespaceInfo(PBHelper.convert((NamespaceInfo)ns)).setLayoutVersion(-1);
            byte[] data = TextFormat.printToString((MessageOrBuilder)builder.build()).getBytes(Charsets.UTF_8);
            this.zkc.create(this.versionPath, data, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            this.zkc.create(this.ledgerPath, new byte[]{48}, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        catch (KeeperException ke) {
            LOG.error((Object)"Error accessing zookeeper to format", (Throwable)ke);
            throw new IOException("Error accessing zookeeper to format", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted during format", ie);
        }
        catch (BKException bke) {
            throw new IOException("Error cleaning up ledgers during format", bke);
        }
    }

    public boolean hasSomeData() throws IOException {
        try {
            return this.zkc.exists(this.basePath, false) != null;
        }
        catch (KeeperException ke) {
            throw new IOException("Couldn't contact zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while checking for data", ie);
        }
    }

    private synchronized void checkEnv() throws IOException {
        if (!this.initialized) {
            try {
                Stat versionStat = this.zkc.exists(this.versionPath, false);
                if (versionStat == null) {
                    throw new IOException("Environment not initialized. Have you forgotten to format?");
                }
                byte[] d = this.zkc.getData(this.versionPath, false, versionStat);
                BKJournalProtos.VersionProto.Builder builder = BKJournalProtos.VersionProto.newBuilder();
                TextFormat.merge((CharSequence)new String(d, Charsets.UTF_8), (Message.Builder)builder);
                if (!builder.isInitialized()) {
                    throw new IOException("Invalid/Incomplete data in znode");
                }
                BKJournalProtos.VersionProto vp = builder.build();
                assert (vp.getLayoutVersion() == -1);
                NamespaceInfo readns = PBHelper.convert((HdfsProtos.NamespaceInfoProto)vp.getNamespaceInfo());
                if (this.nsInfo.getNamespaceID() != readns.getNamespaceID() || !this.nsInfo.clusterID.equals(readns.getClusterID()) || !this.nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
                    String err = String.format("Environment mismatch. Running process %s, stored in ZK %s", this.nsInfo, readns);
                    LOG.error((Object)err);
                    throw new IOException(err);
                }
                this.ci.init();
                this.initialized = true;
            }
            catch (KeeperException ke) {
                throw new IOException("Cannot access ZooKeeper", ke);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted while checking environment", ie);
            }
        }
    }

    public EditLogOutputStream startLogSegment(long txId, int layoutVersion) throws IOException {
        this.checkEnv();
        if (txId <= this.maxTxId.get()) {
            throw new IOException("We've already seen " + txId + ". A new stream cannot be created with it");
        }
        try {
            String existingInprogressNode = this.ci.read();
            if (null != existingInprogressNode && this.zkc.exists(existingInprogressNode, false) != null) {
                throw new IOException("Inprogress node already exists");
            }
            if (this.currentLedger != null) {
                this.currentLedger.close();
            }
            this.currentLedger = this.bkc.createLedger(this.ensembleSize, this.quorumSize, this.ackQuorumSize, BookKeeper.DigestType.MAC, this.digestpw.getBytes(org.apache.commons.io.Charsets.UTF_8));
        }
        catch (BKException bke) {
            throw new IOException("Error creating ledger", bke);
        }
        catch (KeeperException ke) {
            throw new IOException("Error in zookeeper while creating ledger", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted creating ledger", ie);
        }
        try {
            String znodePath = this.inprogressZNode(txId);
            EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath, layoutVersion, this.currentLedger.getId(), txId);
            l.write(this.zkc, znodePath);
            this.maxTxId.store(txId);
            this.ci.update(znodePath);
            return new BookKeeperEditLogOutputStream(this.conf, this.currentLedger);
        }
        catch (KeeperException ke) {
            this.cleanupLedger(this.currentLedger);
            throw new IOException("Error storing ledger metadata", ke);
        }
    }

    private void cleanupLedger(LedgerHandle lh) {
        try {
            long id = this.currentLedger.getId();
            this.currentLedger.close();
            this.bkc.deleteLedger(id);
        }
        catch (BKException bke) {
            LOG.error((Object)"Error closing ledger", (Throwable)bke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.warn((Object)"Interrupted while closing ledger", (Throwable)ie);
        }
    }

    public void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException {
        this.checkEnv();
        String inprogressPath = this.inprogressZNode(firstTxId);
        try {
            Stat inprogressStat;
            block13: {
                inprogressStat = this.zkc.exists(inprogressPath, false);
                if (inprogressStat == null) {
                    throw new IOException("Inprogress znode " + inprogressPath + " doesn't exist");
                }
                EditLogLedgerMetadata l = EditLogLedgerMetadata.read(this.zkc, inprogressPath);
                if (this.currentLedger != null) {
                    if (l.getLedgerId() == this.currentLedger.getId()) {
                        try {
                            this.currentLedger.close();
                        }
                        catch (BKException bke) {
                            LOG.error((Object)"Error closing current ledger", (Throwable)bke);
                        }
                        this.currentLedger = null;
                    } else {
                        throw new IOException("Active ledger has different ID to inprogress. " + l.getLedgerId() + " found, " + this.currentLedger.getId() + " expected");
                    }
                }
                if (l.getFirstTxId() != firstTxId) {
                    throw new IOException("Transaction id not as expected, " + l.getFirstTxId() + " found, " + firstTxId + " expected");
                }
                l.finalizeLedger(lastTxId);
                String finalisedPath = this.finalizedLedgerZNode(firstTxId, lastTxId);
                try {
                    l.write(this.zkc, finalisedPath);
                }
                catch (KeeperException.NodeExistsException nee) {
                    if (l.verify(this.zkc, finalisedPath)) break block13;
                    throw new IOException("Node " + finalisedPath + " already exists" + " but data doesn't match");
                }
            }
            this.maxTxId.store(lastTxId);
            this.zkc.delete(inprogressPath, inprogressStat.getVersion());
            String inprogressPathFromCI = this.ci.read();
            if (inprogressPath.equals(inprogressPathFromCI)) {
                this.ci.clear();
            }
        }
        catch (KeeperException e) {
            throw new IOException("Error finalising ledger", e);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Error finalising ledger", ie);
        }
    }

    public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) throws IOException {
        List<EditLogLedgerMetadata> currentLedgerList = this.getLedgerList(fromTxId, inProgressOk);
        try {
            BookKeeperEditLogInputStream elis = null;
            for (EditLogLedgerMetadata l : currentLedgerList) {
                long lastTxId = l.getLastTxId();
                if (l.isInProgress()) {
                    lastTxId = this.recoverLastTxId(l, false);
                }
                if (fromTxId < l.getFirstTxId() || fromTxId > lastTxId) {
                    return;
                }
                LedgerHandle h = l.isInProgress() ? this.bkc.openLedgerNoRecovery(l.getLedgerId(), BookKeeper.DigestType.MAC, this.digestpw.getBytes(org.apache.commons.io.Charsets.UTF_8)) : this.bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, this.digestpw.getBytes(org.apache.commons.io.Charsets.UTF_8));
                elis = new BookKeeperEditLogInputStream(h, l);
                elis.skipTo(fromTxId);
                streams.add(elis);
                if (elis.getLastTxId() == -12345L) {
                    return;
                }
                fromTxId = elis.getLastTxId() + 1L;
            }
        }
        catch (BKException e) {
            throw new IOException("Could not open ledger for " + fromTxId, e);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
        }
    }

    long getNumberOfTransactions(long fromTxId, boolean inProgressOk) throws IOException {
        long count = 0L;
        long expectedStart = 0L;
        for (EditLogLedgerMetadata l : this.getLedgerList(inProgressOk)) {
            long lastTxId = l.getLastTxId();
            if (l.isInProgress() && (lastTxId = this.recoverLastTxId(l, false)) == -12345L) break;
            assert (lastTxId >= l.getFirstTxId());
            if (lastTxId < fromTxId) continue;
            if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) {
                count = lastTxId - l.getFirstTxId() + 1L;
                expectedStart = lastTxId + 1L;
                continue;
            }
            if (expectedStart != l.getFirstTxId()) {
                if (count != 0L) break;
                throw new JournalManager.CorruptionException("StartTxId " + l.getFirstTxId() + " is not as expected " + expectedStart + ". Gap in transaction log?");
            }
            count += lastTxId - l.getFirstTxId() + 1L;
            expectedStart = lastTxId + 1L;
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recoverUnfinalizedSegments() throws IOException {
        this.checkEnv();
        BookKeeperJournalManager bookKeeperJournalManager = this;
        synchronized (bookKeeperJournalManager) {
            try {
                List children = this.zkc.getChildren(this.ledgerPath, false);
                for (String child : children) {
                    if (!child.startsWith(BKJM_EDIT_INPROGRESS)) continue;
                    String znode = this.ledgerPath + "/" + child;
                    EditLogLedgerMetadata l = EditLogLedgerMetadata.read(this.zkc, znode);
                    try {
                        long endTxId = this.recoverLastTxId(l, true);
                        if (endTxId == -12345L) {
                            LOG.error((Object)("Unrecoverable corruption has occurred in segment " + l.toString() + " at path " + znode + ". Unable to continue recovery."));
                            throw new IOException("Unrecoverable corruption, please check logs.");
                        }
                        this.finalizeLogSegment(l.getFirstTxId(), endTxId);
                    }
                    catch (SegmentEmptyException see) {
                        LOG.warn((Object)("Inprogress znode " + child + " refers to a ledger which is empty. This occurs when the NN" + " crashes after opening a segment, but before writing the" + " OP_START_LOG_SEGMENT op. It is safe to delete." + " MetaData [" + l.toString() + "]"));
                        if (this.maxTxId.get() == l.getFirstTxId()) {
                            this.maxTxId.reset(this.maxTxId.get() - 1L);
                        }
                        this.zkc.delete(znode, -1);
                    }
                }
            }
            catch (KeeperException.NoNodeException nne) {
            }
            catch (KeeperException ke) {
                throw new IOException("Couldn't get list of inprogress segments", ke);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted getting list of inprogress segments", ie);
            }
        }
    }

    public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
        this.checkEnv();
        for (EditLogLedgerMetadata l : this.getLedgerList(false)) {
            if (l.getLastTxId() >= minTxIdToKeep) continue;
            try {
                Stat stat = this.zkc.exists(l.getZkPath(), false);
                this.zkc.delete(l.getZkPath(), stat.getVersion());
                this.bkc.deleteLedger(l.getLedgerId());
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                LOG.error((Object)("Interrupted while purging " + l), (Throwable)ie);
            }
            catch (BKException bke) {
                LOG.error((Object)"Couldn't delete ledger from bookkeeper", (Throwable)bke);
            }
            catch (KeeperException ke) {
                LOG.error((Object)"Error deleting ledger entry in zookeeper", (Throwable)ke);
            }
        }
    }

    public void discardSegments(long startTxId) throws IOException {
        throw new UnsupportedOperationException();
    }

    public void doPreUpgrade() throws IOException {
        throw new UnsupportedOperationException();
    }

    public void doUpgrade(Storage storage) throws IOException {
        throw new UnsupportedOperationException();
    }

    public long getJournalCTime() throws IOException {
        throw new UnsupportedOperationException();
    }

    public void doFinalize() throws IOException {
        throw new UnsupportedOperationException();
    }

    public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
        throw new UnsupportedOperationException();
    }

    public void doRollback() throws IOException {
        throw new UnsupportedOperationException();
    }

    public void close() throws IOException {
        try {
            this.bkc.close();
            this.zkc.close();
        }
        catch (BKException bke) {
            throw new IOException("Couldn't close bookkeeper client", bke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while closing journal manager", ie);
        }
    }

    public void setOutputBufferCapacity(int size) {
        this.conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) throws IOException, SegmentEmptyException {
        LedgerHandle lh = null;
        try {
            lh = fence ? this.bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, this.digestpw.getBytes(org.apache.commons.io.Charsets.UTF_8)) : this.bkc.openLedgerNoRecovery(l.getLedgerId(), BookKeeper.DigestType.MAC, this.digestpw.getBytes(org.apache.commons.io.Charsets.UTF_8));
        }
        catch (BKException bke) {
            throw new IOException("Exception opening ledger for " + l, bke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted opening ledger for " + l, ie);
        }
        try (BookKeeperEditLogInputStream in = null;){
            long lastAddConfirmed = lh.getLastAddConfirmed();
            if (lastAddConfirmed == -1L) {
                throw new SegmentEmptyException();
            }
            in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
            long endTxId = -12345L;
            FSEditLogOp op = in.readOp();
            while (op != null) {
                if (endTxId == -12345L || op.getTransactionId() == endTxId + 1L) {
                    endTxId = op.getTransactionId();
                }
                op = in.readOp();
            }
            long l2 = endTxId;
            return l2;
        }
    }

    List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk) throws IOException {
        return this.getLedgerList(-1L, inProgressOk);
    }

    private List<EditLogLedgerMetadata> getLedgerList(long fromTxId, boolean inProgressOk) throws IOException {
        ArrayList<EditLogLedgerMetadata> ledgers = new ArrayList<EditLogLedgerMetadata>();
        try {
            List ledgerNames = this.zkc.getChildren(this.ledgerPath, false);
            for (String ledgerName : ledgerNames) {
                if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) continue;
                String legderMetadataPath = this.ledgerPath + "/" + ledgerName;
                try {
                    EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata.read(this.zkc, legderMetadataPath);
                    if (editLogLedgerMetadata.getLastTxId() != -12345L && editLogLedgerMetadata.getLastTxId() < fromTxId) continue;
                    ledgers.add(editLogLedgerMetadata);
                }
                catch (KeeperException.NoNodeException e) {
                    LOG.warn((Object)("ZNode: " + legderMetadataPath + " might have finalized and deleted." + " So ignoring NoNodeException."));
                }
            }
        }
        catch (KeeperException e) {
            throw new IOException("Exception reading ledger list from zk", e);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted getting list of ledgers from zk", ie);
        }
        Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
        return ledgers;
    }

    String finalizedLedgerZNode(long startTxId, long endTxId) {
        return String.format("%s/edits_%018d_%018d", this.ledgerPath, startTxId, endTxId);
    }

    String inprogressZNode(long startTxid) {
        return this.ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
    }

    @VisibleForTesting
    void setZooKeeper(ZooKeeper zk) {
        this.zkc = zk;
    }

    private static class SegmentEmptyException
    extends IOException {
        private SegmentEmptyException() {
        }
    }

    private class ZkConnectionWatcher
    implements Watcher {
        private ZkConnectionWatcher() {
        }

        public void process(WatchedEvent event) {
            if (Watcher.Event.KeeperState.SyncConnected.equals((Object)event.getState())) {
                BookKeeperJournalManager.this.zkConnectLatch.countDown();
            }
        }
    }
}

