/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metalog;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LocalLogManager
implements RaftClient<ApiMessageAndVersion>,
AutoCloseable {
    private final Logger log;
    private final int nodeId;
    private final SharedLogData shared;
    private final EventQueue eventQueue;
    private boolean initialized = false;
    private boolean shutdown = false;
    private long maxReadOffset = Long.MAX_VALUE;
    private final Map<RaftClient.Listener<ApiMessageAndVersion>, MetaLogListenerData> listeners = new IdentityHashMap<RaftClient.Listener<ApiMessageAndVersion>, MetaLogListenerData>();
    private volatile LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
    private AtomicBoolean resignAfterNonAtomicCommit = new AtomicBoolean(false);

    public LocalLogManager(LogContext logContext, int nodeId, SharedLogData shared, String threadNamePrefix) {
        this.log = logContext.logger(LocalLogManager.class);
        this.nodeId = nodeId;
        this.shared = shared;
        this.maxReadOffset = shared.initialMaxReadOffset();
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix, (EventQueue.Event)new ShutdownEvent());
        shared.registerLogManager(this);
    }

    private void scheduleLogCheck() {
        this.eventQueue.append(() -> {
            try {
                this.log.debug("Node {}: running log check.", (Object)this.nodeId);
                int numEntriesFound = 0;
                block2: for (MetaLogListenerData listenerData : this.listeners.values()) {
                    while (true) {
                        LocalBatch batch;
                        Map.Entry<Long, LocalBatch> entry;
                        Optional<RawSnapshotReader> snapshot;
                        LeaderAndEpoch notifiedLeader = listenerData.notifiedLeader();
                        if (!OptionalInt.of(this.nodeId).equals(notifiedLeader.leaderId()) && (snapshot = this.shared.nextSnapshot(listenerData.offset())).isPresent()) {
                            this.log.trace("Node {}: handling snapshot with id {}.", (Object)this.nodeId, (Object)snapshot.get().snapshotId());
                            listenerData.handleLoadSnapshot((SnapshotReader<ApiMessageAndVersion>)RecordsSnapshotReader.of((RawSnapshotReader)snapshot.get(), (RecordSerde)new MetadataRecordSerde(), (BufferSupplier)BufferSupplier.create(), (int)Integer.MAX_VALUE, (boolean)true));
                        }
                        if ((entry = this.shared.nextBatch(listenerData.offset())) == null) {
                            this.log.trace("Node {}: reached the end of the log after finding {} entries.", (Object)this.nodeId, (Object)numEntriesFound);
                            continue block2;
                        }
                        long entryOffset = entry.getKey();
                        if (entryOffset > this.maxReadOffset) {
                            this.log.trace("Node {}: after {} entries, not reading the next entry because its offset is {}, and maxReadOffset is {}.", new Object[]{this.nodeId, numEntriesFound, entryOffset, this.maxReadOffset});
                            continue block2;
                        }
                        if (entry.getValue() instanceof LeaderChangeBatch) {
                            batch = (LeaderChangeBatch)entry.getValue();
                            this.log.trace("Node {}: handling LeaderChange to {}.", (Object)this.nodeId, (Object)((LeaderChangeBatch)batch).newLeader);
                            LeaderAndEpoch sharedLeader = this.shared.leaderAndEpoch();
                            if (((LeaderChangeBatch)batch).newLeader.equals((Object)sharedLeader)) {
                                this.log.debug("Node {}: Executing handleLeaderChange {}", (Object)this.nodeId, (Object)sharedLeader);
                                listenerData.handleLeaderChange(entryOffset, ((LeaderChangeBatch)batch).newLeader);
                                if (((LeaderChangeBatch)batch).newLeader.epoch() > this.leader.epoch()) {
                                    this.leader = ((LeaderChangeBatch)batch).newLeader;
                                }
                            } else {
                                this.log.debug("Node {}: Ignoring {} since it doesn't match the latest known leader {}", new Object[]{this.nodeId, ((LeaderChangeBatch)batch).newLeader, sharedLeader});
                                listenerData.setOffset(entryOffset);
                            }
                        } else if (entry.getValue() instanceof LocalRecordBatch) {
                            batch = (LocalRecordBatch)entry.getValue();
                            this.log.trace("Node {}: handling LocalRecordBatch with offset {}.", (Object)this.nodeId, (Object)entryOffset);
                            ObjectSerializationCache objectCache = new ObjectSerializationCache();
                            listenerData.handleCommit((MemoryBatchReader<ApiMessageAndVersion>)MemoryBatchReader.of(Collections.singletonList(Batch.data((long)(entryOffset - (long)((LocalRecordBatch)batch).records.size() + 1L), (int)((LocalRecordBatch)batch).leaderEpoch, (long)((LocalRecordBatch)batch).appendTimestamp, (int)((LocalRecordBatch)batch).records.stream().mapToInt(record -> LocalLogManager.messageSize(record, objectCache)).sum(), (List)((LocalRecordBatch)batch).records)), reader -> {}));
                        }
                        ++numEntriesFound;
                    }
                }
                this.log.trace("Completed log check for node " + this.nodeId);
            }
            catch (Exception e) {
                this.log.error("Exception while handling log check", (Throwable)e);
            }
        });
    }

    private static int messageSize(ApiMessageAndVersion messageAndVersion, ObjectSerializationCache objectCache) {
        return new MetadataRecordSerde().recordSize(messageAndVersion, objectCache);
    }

    public void beginShutdown() {
        this.eventQueue.beginShutdown("beginShutdown");
    }

    @Override
    public void close() {
        this.log.debug("Node {}: closing.", (Object)this.nodeId);
        this.beginShutdown();
        try {
            this.eventQueue.close();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    public CompletableFuture<Void> shutdown(int timeoutMs) {
        CompletableFuture<Void> shutdownFuture = new CompletableFuture<Void>();
        try {
            this.close();
            shutdownFuture.complete(null);
        }
        catch (Throwable t) {
            shutdownFuture.completeExceptionally(t);
        }
        return shutdownFuture;
    }

    public void initialize() {
        this.eventQueue.append(() -> {
            this.log.debug("initialized local log manager for node " + this.nodeId);
            this.initialized = true;
        });
    }

    public void register(RaftClient.Listener<ApiMessageAndVersion> listener) {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> {
            if (this.shutdown) {
                this.log.info("Node {}: can't register because local log manager has already been shut down.", (Object)this.nodeId);
                future.complete(null);
            } else if (this.initialized) {
                int id = System.identityHashCode(listener);
                if (this.listeners.putIfAbsent(listener, new MetaLogListenerData(listener)) != null) {
                    this.log.error("Node {}: can't register because listener {} already exists", (Object)this.nodeId, (Object)id);
                } else {
                    this.log.info("Node {}: registered MetaLogListener {}", (Object)this.nodeId, (Object)id);
                }
                this.shared.electLeaderIfNeeded();
                this.scheduleLogCheck();
                future.complete(null);
            } else {
                this.log.info("Node {}: can't register because local log manager has not been initialized.", (Object)this.nodeId);
                future.completeExceptionally(new RuntimeException("LocalLogManager was not initialized."));
            }
        });
        try {
            future.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) {
        this.eventQueue.append(() -> {
            if (this.shutdown) {
                this.log.info("Node {}: can't unregister because local log manager is shutdown", (Object)this.nodeId);
            } else {
                int id = System.identityHashCode(listener);
                if (this.listeners.remove(listener) == null) {
                    this.log.error("Node {}: can't unregister because the listener {} doesn't exists", (Object)this.nodeId, (Object)id);
                } else {
                    this.log.info("Node {}: unregistered MetaLogListener {}", (Object)this.nodeId, (Object)id);
                }
            }
        });
    }

    public synchronized OptionalLong highWatermark() {
        if (this.shared.prevOffset > 0L) {
            return OptionalLong.of(this.shared.prevOffset);
        }
        return OptionalLong.empty();
    }

    public long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
        if (batch.isEmpty()) {
            throw new IllegalArgumentException("Batch cannot be empty");
        }
        List<ApiMessageAndVersion> first = batch.subList(0, batch.size() / 2);
        List<ApiMessageAndVersion> second = batch.subList(batch.size() / 2, batch.size());
        Assertions.assertEquals((int)batch.size(), (int)(first.size() + second.size()));
        Assertions.assertFalse((boolean)second.isEmpty());
        OptionalLong firstOffset = first.stream().mapToLong(record -> this.scheduleAtomicAppend(epoch, OptionalLong.empty(), Collections.singletonList(record))).max();
        if (firstOffset.isPresent() && this.resignAfterNonAtomicCommit.getAndSet(false)) {
            this.resign(this.leader.epoch());
            return firstOffset.getAsLong() + (long)second.size();
        }
        return second.stream().mapToLong(record -> this.scheduleAtomicAppend(epoch, OptionalLong.empty(), Collections.singletonList(record))).max().getAsLong();
    }

    public long scheduleAtomicAppend(int epoch, OptionalLong requiredEndOffset, List<ApiMessageAndVersion> batch) {
        if (batch.isEmpty()) {
            throw new IllegalArgumentException("Batch cannot be empty");
        }
        return this.shared.tryAppend(this.nodeId, this.leader.epoch(), requiredEndOffset, batch);
    }

    public void resign(int epoch) {
        if (epoch < 0) {
            throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch);
        }
        LeaderAndEpoch leaderAndEpoch = this.leaderAndEpoch();
        int currentEpoch = leaderAndEpoch.epoch();
        if (epoch > currentEpoch) {
            throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + " which is larger than the current epoch " + currentEpoch);
        }
        if (epoch < currentEpoch) {
            this.log.debug("Ignoring call to resign from epoch {} since it is smaller than the current epoch {}", (Object)epoch, (Object)currentEpoch);
            return;
        }
        LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), currentEpoch + 1);
        try {
            this.shared.tryAppend(this.nodeId, currentEpoch, OptionalLong.empty(), new LeaderChangeBatch(nextLeader));
        }
        catch (NotLeaderException exp) {
            this.log.debug("Ignoring call to resign from epoch {}. Either we are not the leader or the provided epoch is smaller than the current epoch {}", (Object)epoch, (Object)currentEpoch);
            return;
        }
    }

    public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(OffsetAndEpoch snapshotId, long lastContainedLogTimestamp) {
        return RecordsSnapshotWriter.createWithHeader(() -> this.createNewSnapshot(snapshotId), (int)1024, (MemoryPool)MemoryPool.NONE, (Time)new MockTime(), (long)lastContainedLogTimestamp, (CompressionType)CompressionType.NONE, (RecordSerde)new MetadataRecordSerde());
    }

    private Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId) {
        return Optional.of(new MockRawSnapshotWriter(snapshotId, buffer -> this.shared.addSnapshot((RawSnapshotReader)new MockRawSnapshotReader(snapshotId, buffer))));
    }

    public synchronized Optional<OffsetAndEpoch> latestSnapshotId() {
        return this.shared.latestSnapshotId();
    }

    public synchronized long logEndOffset() {
        return this.shared.prevOffset + 1L;
    }

    public LeaderAndEpoch leaderAndEpoch() {
        return this.leader;
    }

    public OptionalInt nodeId() {
        return OptionalInt.of(this.nodeId);
    }

    public List<RaftClient.Listener<ApiMessageAndVersion>> listeners() {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> future.complete(this.listeners.values().stream().map(l -> ((MetaLogListenerData)l).listener).collect(Collectors.toList())));
        try {
            return (List)future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void setMaxReadOffset(long maxReadOffset) {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> {
            this.log.trace("Node {}: set maxReadOffset to {}.", (Object)this.nodeId, (Object)maxReadOffset);
            this.maxReadOffset = maxReadOffset;
            this.scheduleLogCheck();
            future.complete(null);
        });
        try {
            future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void resignAfterNonAtomicCommit() {
        this.resignAfterNonAtomicCommit.set(true);
    }

    class ShutdownEvent
    implements EventQueue.Event {
        ShutdownEvent() {
        }

        public void run() throws Exception {
            try {
                if (LocalLogManager.this.initialized && !LocalLogManager.this.shutdown) {
                    LocalLogManager.this.log.debug("Node {}: beginning shutdown.", (Object)LocalLogManager.this.nodeId);
                    LocalLogManager.this.resign(LocalLogManager.this.leader.epoch());
                    for (MetaLogListenerData listenerData : LocalLogManager.this.listeners.values()) {
                        listenerData.beginShutdown();
                    }
                    LocalLogManager.this.shared.unregisterLogManager(LocalLogManager.this);
                }
            }
            catch (Exception e) {
                LocalLogManager.this.log.error("Unexpected exception while sending beginShutdown callbacks", (Throwable)e);
            }
            LocalLogManager.this.shutdown = true;
        }
    }

    private static class MetaLogListenerData {
        private long offset = -1L;
        private LeaderAndEpoch notifiedLeader = new LeaderAndEpoch(OptionalInt.empty(), 0);
        private final RaftClient.Listener<ApiMessageAndVersion> listener;

        MetaLogListenerData(RaftClient.Listener<ApiMessageAndVersion> listener) {
            this.listener = listener;
        }

        long offset() {
            return this.offset;
        }

        void setOffset(long offset) {
            this.offset = offset;
        }

        LeaderAndEpoch notifiedLeader() {
            return this.notifiedLeader;
        }

        void handleCommit(MemoryBatchReader<ApiMessageAndVersion> reader) {
            this.listener.handleCommit(reader);
            this.offset = reader.lastOffset().getAsLong();
        }

        void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
            this.listener.handleLoadSnapshot(reader);
            this.offset = reader.lastContainedLogOffset();
        }

        void handleLeaderChange(long offset, LeaderAndEpoch leader) {
            this.listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.empty(), leader.epoch()));
            this.listener.handleLeaderChange(leader);
            this.notifiedLeader = leader;
            this.offset = offset;
        }

        void beginShutdown() {
            this.listener.beginShutdown();
        }
    }

    public static class SharedLogData {
        private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
        private final HashMap<Integer, LocalLogManager> logManagers = new HashMap();
        private final TreeMap<Long, LocalBatch> batches = new TreeMap();
        private LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
        private long prevOffset;
        private long initialMaxReadOffset = Long.MAX_VALUE;
        private NavigableMap<Long, RawSnapshotReader> snapshots = new TreeMap<Long, RawSnapshotReader>();

        public SharedLogData(Optional<RawSnapshotReader> snapshot) {
            if (snapshot.isPresent()) {
                RawSnapshotReader initialSnapshot = snapshot.get();
                this.prevOffset = initialSnapshot.snapshotId().offset() - 1L;
                this.snapshots.put(this.prevOffset, initialSnapshot);
            } else {
                this.prevOffset = -1L;
            }
        }

        synchronized void registerLogManager(LocalLogManager logManager) {
            if (this.logManagers.put(logManager.nodeId, logManager) != null) {
                throw new RuntimeException("Can't have multiple LocalLogManagers with id " + logManager.nodeId());
            }
            this.electLeaderIfNeeded();
        }

        synchronized void unregisterLogManager(LocalLogManager logManager) {
            if (!this.logManagers.remove(logManager.nodeId, logManager)) {
                throw new RuntimeException("Log manager " + logManager.nodeId() + " was not found.");
            }
        }

        synchronized long tryAppend(int nodeId, int epoch, OptionalLong requiredBaseOffset, List<ApiMessageAndVersion> batch) {
            long appendTimestamp = (this.prevOffset + 1L) * 10L;
            return this.tryAppend(nodeId, epoch, requiredBaseOffset, new LocalRecordBatch(epoch, appendTimestamp, batch));
        }

        synchronized long tryAppend(int nodeId, int epoch, OptionalLong requiredBaseOffset, LocalBatch batch) {
            if (!this.leader.isLeader(nodeId)) {
                this.log.debug("tryAppend(nodeId={}, epoch={}): the given node id does not match the current leader id of {}.", new Object[]{nodeId, epoch, this.leader.leaderId()});
                throw new NotLeaderException("Append failed because the replication is not the current leader");
            }
            if (epoch < this.leader.epoch()) {
                throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. Current leader epoch = " + this.leader.epoch());
            }
            if (epoch > this.leader.epoch()) {
                throw new IllegalArgumentException("Attempt to append from epoch " + epoch + " which is larger than the current epoch " + this.leader.epoch());
            }
            this.log.trace("tryAppend(nodeId={}): appending {}.", (Object)nodeId, (Object)batch);
            long offset = this.append(requiredBaseOffset, batch);
            this.electLeaderIfNeeded();
            return offset;
        }

        public synchronized long append(OptionalLong requiredBaseOffset, LocalBatch batch) {
            long nextEndOffset = this.prevOffset + (long)batch.size();
            requiredBaseOffset.ifPresent(r -> {
                if (r != this.prevOffset + 1L) {
                    throw new UnexpectedBaseOffsetException("Wanted base offset " + r + ", but the next offset was " + nextEndOffset);
                }
            });
            this.log.debug("append(batch={}, nextEndOffset={})", (Object)batch, (Object)nextEndOffset);
            this.batches.put(nextEndOffset, batch);
            if (batch instanceof LeaderChangeBatch) {
                LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch)batch;
                this.leader = leaderChangeBatch.newLeader;
            }
            for (LocalLogManager logManager : this.logManagers.values()) {
                logManager.scheduleLogCheck();
            }
            this.prevOffset = nextEndOffset;
            return nextEndOffset;
        }

        synchronized void electLeaderIfNeeded() {
            if (this.leader.leaderId().isPresent() || this.logManagers.isEmpty()) {
                return;
            }
            int nextLeaderIndex = ThreadLocalRandom.current().nextInt(this.logManagers.size());
            Iterator<Integer> iter = this.logManagers.keySet().iterator();
            Integer nextLeaderNode = null;
            for (int i = 0; i <= nextLeaderIndex; ++i) {
                nextLeaderNode = iter.next();
            }
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(nextLeaderNode), this.leader.epoch() + 1);
            this.log.info("Elected new leader: {}.", (Object)newLeader);
            this.append(OptionalLong.empty(), new LeaderChangeBatch(newLeader));
        }

        synchronized LeaderAndEpoch leaderAndEpoch() {
            return this.leader;
        }

        synchronized Map.Entry<Long, LocalBatch> nextBatch(long offset) {
            Map.Entry<Long, LocalBatch> entry = this.batches.higherEntry(offset);
            if (entry == null) {
                return null;
            }
            return new AbstractMap.SimpleImmutableEntry<Long, LocalBatch>(entry.getKey(), entry.getValue());
        }

        synchronized Optional<RawSnapshotReader> nextSnapshot(long offset) {
            return Optional.ofNullable(this.snapshots.lastEntry()).flatMap(entry -> {
                if (offset <= (Long)entry.getKey()) {
                    return Optional.of((RawSnapshotReader)entry.getValue());
                }
                return Optional.empty();
            });
        }

        synchronized void addSnapshot(RawSnapshotReader newSnapshot) {
            if (newSnapshot.snapshotId().offset() - 1L > this.prevOffset) {
                this.log.error("Ignored attempt to add a snapshot {} that is greater than the latest offset {}", (Object)newSnapshot, (Object)this.prevOffset);
            } else {
                this.snapshots.put(newSnapshot.snapshotId().offset() - 1L, newSnapshot);
                this.notifyAll();
            }
        }

        synchronized RawSnapshotReader waitForSnapshot(long committedOffset) throws InterruptedException {
            RawSnapshotReader reader;
            while ((reader = (RawSnapshotReader)this.snapshots.get(committedOffset)) == null) {
                this.wait();
            }
            return reader;
        }

        synchronized RawSnapshotReader waitForLatestSnapshot() throws InterruptedException {
            while (this.snapshots.isEmpty()) {
                this.wait();
            }
            return Objects.requireNonNull(this.snapshots.lastEntry()).getValue();
        }

        synchronized Optional<OffsetAndEpoch> latestSnapshotId() {
            return Optional.ofNullable(this.snapshots.lastEntry()).map(entry -> ((RawSnapshotReader)entry.getValue()).snapshotId());
        }

        synchronized long appendedBytes() {
            ObjectSerializationCache objectCache = new ObjectSerializationCache();
            return this.batches.values().stream().flatMapToInt(batch -> {
                if (batch instanceof LocalRecordBatch) {
                    LocalRecordBatch localBatch = (LocalRecordBatch)batch;
                    return localBatch.records.stream().mapToInt(record -> LocalLogManager.messageSize(record, objectCache));
                }
                return IntStream.empty();
            }).sum();
        }

        public SharedLogData setInitialMaxReadOffset(long initialMaxReadOffset) {
            this.initialMaxReadOffset = initialMaxReadOffset;
            return this;
        }

        public long initialMaxReadOffset() {
            return this.initialMaxReadOffset;
        }

        public synchronized List<ApiMessageAndVersion> allRecords() {
            ArrayList<ApiMessageAndVersion> allRecords = new ArrayList<ApiMessageAndVersion>();
            for (LocalBatch batch : this.batches.values()) {
                if (!(batch instanceof LocalRecordBatch)) continue;
                LocalRecordBatch recordBatch = (LocalRecordBatch)batch;
                allRecords.addAll(recordBatch.records);
            }
            return allRecords;
        }
    }

    public static class LocalRecordBatch
    implements LocalBatch {
        private final int leaderEpoch;
        private final long appendTimestamp;
        private final List<ApiMessageAndVersion> records;

        public LocalRecordBatch(int leaderEpoch, long appendTimestamp, List<ApiMessageAndVersion> records) {
            this.leaderEpoch = leaderEpoch;
            this.appendTimestamp = appendTimestamp;
            this.records = records;
        }

        @Override
        public int epoch() {
            return this.leaderEpoch;
        }

        @Override
        public int size() {
            return this.records.size();
        }

        public boolean equals(Object o) {
            if (!(o instanceof LocalRecordBatch)) {
                return false;
            }
            LocalRecordBatch other = (LocalRecordBatch)o;
            return this.leaderEpoch == other.leaderEpoch && this.appendTimestamp == other.appendTimestamp && Objects.equals(this.records, other.records);
        }

        public int hashCode() {
            return Objects.hash(this.leaderEpoch, this.appendTimestamp, this.records);
        }

        public String toString() {
            return String.format("LocalRecordBatch(leaderEpoch=%s, appendTimestamp=%s, records=%s)", this.leaderEpoch, this.appendTimestamp, this.records);
        }
    }

    public static class LeaderChangeBatch
    implements LocalBatch {
        private final LeaderAndEpoch newLeader;

        public LeaderChangeBatch(LeaderAndEpoch newLeader) {
            this.newLeader = newLeader;
        }

        @Override
        public int epoch() {
            return this.newLeader.epoch();
        }

        @Override
        public int size() {
            return 1;
        }

        public boolean equals(Object o) {
            if (!(o instanceof LeaderChangeBatch)) {
                return false;
            }
            LeaderChangeBatch other = (LeaderChangeBatch)o;
            return other.newLeader.equals((Object)this.newLeader);
        }

        public int hashCode() {
            return Objects.hash(this.newLeader);
        }

        public String toString() {
            return "LeaderChangeBatch(newLeader=" + this.newLeader + ")";
        }
    }

    static interface LocalBatch {
        public int epoch();

        public int size();
    }
}

