/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.segment.realtime.appenderator;

import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.annotations.VisibleForTesting;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Joiner;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Stopwatch;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.primitives.Ints;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.druid.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.io.druid.client.cache.Cache;
import org.apache.hive.druid.io.druid.client.cache.CacheConfig;
import org.apache.hive.druid.io.druid.common.guava.ThreadRenamingCallable;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.IAE;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.common.RetryUtils;
import org.apache.hive.druid.io.druid.java.util.common.StringUtils;
import org.apache.hive.druid.io.druid.java.util.common.concurrent.Execs;
import org.apache.hive.druid.io.druid.java.util.common.io.Closer;
import org.apache.hive.druid.io.druid.java.util.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.hive.druid.io.druid.query.QuerySegmentWalker;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;
import org.apache.hive.druid.io.druid.segment.IndexIO;
import org.apache.hive.druid.io.druid.segment.IndexMerger;
import org.apache.hive.druid.io.druid.segment.IndexSpec;
import org.apache.hive.druid.io.druid.segment.QueryableIndex;
import org.apache.hive.druid.io.druid.segment.QueryableIndexSegment;
import org.apache.hive.druid.io.druid.segment.Segment;
import org.apache.hive.druid.io.druid.segment.incremental.IndexSizeExceededException;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.loading.DataSegmentPusher;
import org.apache.hive.druid.io.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.hive.druid.io.druid.segment.realtime.FireHydrant;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.Committed;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentNotWritableException;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Sink;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import org.joda.time.ReadablePeriod;

public class AppenderatorImpl
implements Appenderator {
    private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class);
    private static final int WARN_DELAY = 1000;
    private static final String IDENTIFIER_FILE_NAME = "identifier.json";
    private final DataSchema schema;
    private final AppenderatorConfig tuningConfig;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentPusher dataSegmentPusher;
    private final ObjectMapper objectMapper;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final IndexIO indexIO;
    private final IndexMerger indexMerger;
    private final Cache cache;
    private final Map<SegmentIdentifier, Sink> sinks = new ConcurrentHashMap<SegmentIdentifier, Sink>();
    private final Set<SegmentIdentifier> droppingSinks = Sets.newConcurrentHashSet();
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline(String.CASE_INSENSITIVE_ORDER);
    private final QuerySegmentWalker texasRanger;
    private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
    private final AtomicInteger totalRows = new AtomicInteger();
    private final Lock commitLock = new ReentrantLock();
    private volatile ListeningExecutorService persistExecutor = null;
    private volatile ListeningExecutorService pushExecutor = null;
    private volatile ListeningExecutorService intermediateTempExecutor = null;
    private volatile long nextFlush;
    private volatile FileLock basePersistDirLock = null;
    private volatile FileChannel basePersistDirLockChannel = null;
    private AtomicBoolean closed = new AtomicBoolean(false);

    public AppenderatorImpl(DataSchema schema, AppenderatorConfig tuningConfig, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, ExecutorService queryExecutorService, IndexIO indexIO, IndexMerger indexMerger, Cache cache, CacheConfig cacheConfig) {
        this.schema = Preconditions.checkNotNull(schema, "schema");
        this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
        this.metrics = Preconditions.checkNotNull(metrics, "metrics");
        this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher");
        this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.segmentAnnouncer = Preconditions.checkNotNull(segmentAnnouncer, "segmentAnnouncer");
        this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
        this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
        this.cache = cache;
        this.texasRanger = conglomerate == null ? null : new SinkQuerySegmentWalker(schema.getDataSource(), this.sinkTimeline, objectMapper, emitter, conglomerate, queryExecutorService, Preconditions.checkNotNull(cache, "cache"), cacheConfig);
        log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());
    }

    @Override
    public String getDataSource() {
        return this.schema.getDataSource();
    }

    @Override
    public Object startJob() {
        this.tuningConfig.getBasePersistDirectory().mkdirs();
        this.lockBasePersistDirectory();
        Object retVal = this.bootstrapSinksFromDisk();
        this.initializeExecutors();
        this.resetNextFlush();
        return retVal;
    }

    @Override
    public Appenderator.AppenderatorAddResult add(SegmentIdentifier identifier, InputRow row, @Nullable Supplier<Committer> committerSupplier, boolean allowIncrementalPersists) throws IndexSizeExceededException, SegmentNotWritableException {
        int sinkRowsInMemoryAfterAdd;
        if (!identifier.getDataSource().equals(this.schema.getDataSource())) {
            throw new IAE("Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", this.schema.getDataSource(), identifier.getDataSource());
        }
        Sink sink = this.getOrCreateSink(identifier);
        this.metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
        int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
        try {
            sinkRowsInMemoryAfterAdd = sink.add(row, !allowIncrementalPersists);
        }
        catch (IndexSizeExceededException e) {
            log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
            throw e;
        }
        if (sinkRowsInMemoryAfterAdd < 0) {
            throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
        }
        int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
        this.rowsCurrentlyInMemory.addAndGet(numAddedRows);
        this.totalRows.addAndGet(numAddedRows);
        boolean isPersistRequired = false;
        if (!sink.canAppendRow() || System.currentTimeMillis() > this.nextFlush || this.rowsCurrentlyInMemory.get() >= this.tuningConfig.getMaxRowsInMemory()) {
            if (allowIncrementalPersists) {
                this.persistAll(committerSupplier == null ? null : committerSupplier.get());
            } else {
                isPersistRequired = true;
            }
        }
        return new Appenderator.AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired);
    }

    @Override
    public List<SegmentIdentifier> getSegments() {
        return ImmutableList.copyOf(this.sinks.keySet());
    }

    @Override
    public int getRowCount(SegmentIdentifier identifier) {
        Sink sink = this.sinks.get(identifier);
        if (sink == null) {
            throw new ISE("No such sink: %s", identifier);
        }
        return sink.getNumRows();
    }

    @Override
    public int getTotalRowCount() {
        return this.totalRows.get();
    }

    @VisibleForTesting
    int getRowsInMemory() {
        return this.rowsCurrentlyInMemory.get();
    }

    private Sink getOrCreateSink(SegmentIdentifier identifier) {
        Sink retVal = this.sinks.get(identifier);
        if (retVal == null) {
            retVal = new Sink(identifier.getInterval(), this.schema, identifier.getShardSpec(), identifier.getVersion(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.isReportParseExceptions());
            try {
                this.segmentAnnouncer.announceSegment(retVal.getSegment());
            }
            catch (IOException e) {
                log.makeAlert(e, "Failed to announce new segment[%s]", this.schema.getDataSource()).addData("interval", retVal.getInterval()).emit();
            }
            this.sinks.put(identifier, retVal);
            this.metrics.setSinkCount(this.sinks.size());
            this.sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal));
        }
        return retVal;
    }

    @Override
    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForIntervals(query, intervals);
    }

    @Override
    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForSegments(query, specs);
    }

    @Override
    public void clear() throws InterruptedException {
        try {
            if (this.persistExecutor != null) {
                Future uncommitFuture = this.persistExecutor.submit(new Callable<Object>(){

                    @Override
                    public Object call() throws Exception {
                        try {
                            AppenderatorImpl.this.commitLock.lock();
                            AppenderatorImpl.this.objectMapper.writeValue(AppenderatorImpl.this.computeCommitFile(), (Object)Committed.nil());
                        }
                        finally {
                            AppenderatorImpl.this.commitLock.unlock();
                        }
                        return null;
                    }
                });
                uncommitFuture.get();
                ArrayList<ListenableFuture<?>> futures = Lists.newArrayList();
                for (Map.Entry<SegmentIdentifier, Sink> entry : this.sinks.entrySet()) {
                    futures.add(this.abandonSegment(entry.getKey(), entry.getValue(), true));
                }
                Futures.allAsList(futures).get();
            }
        }
        catch (ExecutionException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override
    public ListenableFuture<?> drop(SegmentIdentifier identifier) {
        Sink sink = this.sinks.get(identifier);
        if (sink != null) {
            return this.abandonSegment(identifier, sink, true);
        }
        return Futures.immediateFuture(null);
    }

    @Override
    public ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, final @Nullable Committer committer) {
        final HashMap<String, Integer> currentHydrants = Maps.newHashMap();
        final ArrayList<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
        int numPersistedRows = 0;
        for (SegmentIdentifier identifier : identifiers) {
            Sink sink = this.sinks.get(identifier);
            if (sink == null) {
                throw new ISE("No sink for identifier: %s", identifier);
            }
            ArrayList<FireHydrant> hydrants = Lists.newArrayList(sink);
            currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size());
            numPersistedRows += sink.getNumRowsInMemory();
            int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
            for (FireHydrant hydrant : hydrants.subList(0, limit)) {
                if (hydrant.hasSwapped()) continue;
                log.info("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier);
                indexesToPersist.add(Pair.of(hydrant, identifier));
            }
            if (!sink.swappable()) continue;
            indexesToPersist.add(Pair.of(sink.swap(), identifier));
        }
        log.info("Submitting persist runnable for dataSource[%s]", this.schema.getDataSource());
        String threadName = StringUtils.format("%s-incremental-persist", this.schema.getDataSource());
        final Object commitMetadata = committer == null ? null : committer.getMetadata();
        Stopwatch runExecStopwatch = Stopwatch.createStarted();
        final Stopwatch persistStopwatch = Stopwatch.createStarted();
        Future future = this.persistExecutor.submit((Callable)new ThreadRenamingCallable<Object>(threadName){

            @Override
            public Object doCall() {
                try {
                    HashMap<String, Integer> commitHydrants;
                    for (Pair pair : indexesToPersist) {
                        AppenderatorImpl.this.metrics.incrementRowOutputCount(AppenderatorImpl.this.persistHydrant((FireHydrant)pair.lhs, (SegmentIdentifier)pair.rhs));
                    }
                    if (committer != null) {
                        log.info("Committing metadata[%s] for sinks[%s].", commitMetadata, Joiner.on(", ").join(currentHydrants.entrySet().stream().map(entry -> StringUtils.format("%s:%d", entry.getKey(), entry.getValue())).collect(Collectors.toList())));
                        committer.run();
                        try {
                            AppenderatorImpl.this.commitLock.lock();
                            commitHydrants = Maps.newHashMap();
                            Committed oldCommit = AppenderatorImpl.this.readCommit();
                            if (oldCommit != null) {
                                commitHydrants.putAll(oldCommit.getHydrants());
                            }
                            commitHydrants.putAll(currentHydrants);
                            AppenderatorImpl.this.writeCommit(new Committed(commitHydrants, commitMetadata));
                        }
                        finally {
                            AppenderatorImpl.this.commitLock.unlock();
                        }
                    }
                    commitHydrants = commitMetadata;
                    return commitHydrants;
                }
                catch (Exception e) {
                    AppenderatorImpl.this.metrics.incrementFailedPersists();
                    throw Throwables.propagate(e);
                }
                finally {
                    AppenderatorImpl.this.metrics.incrementNumPersists();
                    AppenderatorImpl.this.metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
                    persistStopwatch.stop();
                }
            }
        });
        long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(startDelay);
        if (startDelay > 1000L) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay);
        }
        runExecStopwatch.stop();
        this.resetNextFlush();
        this.rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
        return future;
    }

    @Override
    public ListenableFuture<Object> persistAll(@Nullable Committer committer) {
        return this.persist(this.sinks.keySet(), committer);
    }

    @Override
    public ListenableFuture<SegmentsAndMetadata> push(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer) {
        HashMap<SegmentIdentifier, Sink> theSinks = Maps.newHashMap();
        for (SegmentIdentifier identifier : identifiers) {
            Sink sink = this.sinks.get(identifier);
            if (sink == null) {
                throw new ISE("No sink for identifier: %s", identifier);
            }
            theSinks.put(identifier, sink);
            sink.finishWriting();
        }
        return Futures.transform(this.persist(identifiers, committer), commitMetadata -> {
            ArrayList<DataSegment> dataSegments = Lists.newArrayList();
            for (Map.Entry entry : theSinks.entrySet()) {
                if (this.droppingSinks.contains(entry.getKey())) {
                    log.info("Skipping push of currently-dropping sink[%s]", entry.getKey());
                    continue;
                }
                DataSegment dataSegment = this.mergeAndPush((SegmentIdentifier)entry.getKey(), (Sink)entry.getValue());
                if (dataSegment != null) {
                    dataSegments.add(dataSegment);
                    continue;
                }
                log.warn("mergeAndPush[%s] returned null, skipping.", entry.getKey());
            }
            return new SegmentsAndMetadata(dataSegments, commitMetadata);
        }, this.pushExecutor);
    }

    private ListenableFuture<?> pushBarrier() {
        return this.intermediateTempExecutor.submit(() -> this.pushExecutor.submit(() -> {}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DataSegment mergeAndPush(SegmentIdentifier identifier, Sink sink) {
        if (this.sinks.get(identifier) != sink) {
            log.warn("Sink for segment[%s] no longer valid, bailing out of mergeAndPush.", identifier);
            return null;
        }
        File persistDir = this.computePersistDir(identifier);
        File mergedTarget = new File(persistDir, "merged");
        File descriptorFile = this.computeDescriptorFile(identifier);
        for (FireHydrant hydrant : sink) {
            if (sink.isWritable()) {
                throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", identifier);
            }
            FireHydrant fireHydrant = hydrant;
            synchronized (fireHydrant) {
                if (!hydrant.hasSwapped()) {
                    throw new ISE("WTF?! Expected sink to be fully persisted before mergeAndPush. Segment[%s].", identifier);
                }
            }
        }
        try {
            File mergedFile;
            if (descriptorFile.exists()) {
                log.info("Segment[%s] already pushed.", identifier);
                return this.objectMapper.readValue(descriptorFile, DataSegment.class);
            }
            log.info("Pushing merged index for segment[%s].", identifier);
            this.removeDirectory(mergedTarget);
            if (mergedTarget.exists()) {
                throw new ISE("Merged target[%s] exists after removing?!", mergedTarget);
            }
            ArrayList<QueryableIndex> indexes = Lists.newArrayList();
            try (Closer closer = Closer.create();){
                for (FireHydrant fireHydrant : sink) {
                    Pair<Segment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
                    QueryableIndex queryableIndex = ((Segment)segmentAndCloseable.lhs).asQueryableIndex();
                    log.info("Adding hydrant[%s]", fireHydrant);
                    indexes.add(queryableIndex);
                    closer.register((Closeable)segmentAndCloseable.rhs);
                }
                mergedFile = this.indexMerger.mergeQueryableIndex(indexes, this.schema.getGranularitySpec().isRollup(), this.schema.getAggregators(), mergedTarget, this.tuningConfig.getIndexSpec(), this.tuningConfig.getSegmentWriteOutMediumFactory());
            }
            DataSegment segment = RetryUtils.retry(() -> this.dataSegmentPusher.push(mergedFile, sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), true), exception -> exception instanceof Exception, 5);
            this.objectMapper.writeValue(descriptorFile, (Object)segment);
            log.info("Pushed merged index for segment[%s], descriptor is: %s", identifier, segment);
            return segment;
        }
        catch (Exception e) {
            this.metrics.incrementFailedHandoffs();
            log.warn(e, "Failed to push merged index for segment[%s].", identifier);
            throw Throwables.propagate(e);
        }
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            log.info("Appenderator already closed", new Object[0]);
            return;
        }
        log.info("Shutting down...", new Object[0]);
        ArrayList<ListenableFuture<?>> futures = Lists.newArrayList();
        for (Map.Entry<SegmentIdentifier, Sink> entry : this.sinks.entrySet()) {
            futures.add(this.abandonSegment(entry.getKey(), entry.getValue(), false));
        }
        try {
            Futures.allAsList(futures).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn(e, "Interrupted during close()", new Object[0]);
        }
        catch (ExecutionException e) {
            log.warn(e, "Unable to abandon existing segments during close()", new Object[0]);
        }
        try {
            this.shutdownExecutors();
            Preconditions.checkState(this.persistExecutor == null || this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS), "persistExecutor not terminated");
            Preconditions.checkState(this.pushExecutor == null || this.pushExecutor.awaitTermination(365L, TimeUnit.DAYS), "pushExecutor not terminated");
            Preconditions.checkState(this.intermediateTempExecutor == null || this.intermediateTempExecutor.awaitTermination(365L, TimeUnit.DAYS), "intermediateTempExecutor not terminated");
            this.persistExecutor = null;
            this.pushExecutor = null;
            this.intermediateTempExecutor = null;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
        this.unlockBasePersistDirectory();
    }

    @Override
    public void closeNow() {
        if (!this.closed.compareAndSet(false, true)) {
            log.info("Appenderator already closed", new Object[0]);
            return;
        }
        log.info("Shutting down immediately...", new Object[0]);
        for (Map.Entry<SegmentIdentifier, Sink> entry : this.sinks.entrySet()) {
            try {
                this.segmentAnnouncer.unannounceSegment(entry.getValue().getSegment());
            }
            catch (Exception e) {
                log.makeAlert(e, "Failed to unannounce segment[%s]", this.schema.getDataSource()).addData("identifier", entry.getKey().getIdentifierAsString()).emit();
            }
        }
        try {
            this.shutdownExecutors();
            Preconditions.checkState(this.persistExecutor == null || this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS), "persistExecutor not terminated");
            Preconditions.checkState(this.intermediateTempExecutor == null || this.intermediateTempExecutor.awaitTermination(365L, TimeUnit.DAYS), "intermediateTempExecutor not terminated");
            this.persistExecutor = null;
            this.intermediateTempExecutor = null;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
    }

    private void lockBasePersistDirectory() {
        if (this.basePersistDirLock == null) {
            try {
                this.basePersistDirLockChannel = FileChannel.open(this.computeLockFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
                this.basePersistDirLock = this.basePersistDirLockChannel.tryLock();
                if (this.basePersistDirLock == null) {
                    throw new ISE("Cannot acquire lock on basePersistDir: %s", this.computeLockFile());
                }
            }
            catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    }

    private void unlockBasePersistDirectory() {
        try {
            if (this.basePersistDirLock != null) {
                this.basePersistDirLock.release();
                this.basePersistDirLockChannel.close();
                this.basePersistDirLock = null;
            }
        }
        catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private void initializeExecutors() {
        int maxPendingPersists = this.tuningConfig.getMaxPendingPersists();
        if (this.persistExecutor == null) {
            this.persistExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("appenderator_persist_%d", maxPendingPersists));
        }
        if (this.pushExecutor == null) {
            this.pushExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("appenderator_merge_%d", 1));
        }
        if (this.intermediateTempExecutor == null) {
            this.intermediateTempExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("appenderator_abandon_%d", 0));
        }
    }

    private void shutdownExecutors() {
        if (this.persistExecutor != null) {
            this.persistExecutor.shutdownNow();
        }
        if (this.pushExecutor != null) {
            this.pushExecutor.shutdownNow();
        }
        if (this.intermediateTempExecutor != null) {
            this.intermediateTempExecutor.shutdownNow();
        }
    }

    private void resetNextFlush() {
        this.nextFlush = DateTimes.nowUtc().plus((ReadablePeriod)this.tuningConfig.getIntermediatePersistPeriod()).getMillis();
    }

    private Object bootstrapSinksFromDisk() {
        Committed committed;
        Preconditions.checkState(this.sinks.isEmpty(), "Already bootstrapped?!");
        File baseDir = this.tuningConfig.getBasePersistDirectory();
        if (!baseDir.exists()) {
            return null;
        }
        File[] files = baseDir.listFiles();
        if (files == null) {
            return null;
        }
        File commitFile = null;
        try {
            this.commitLock.lock();
            commitFile = this.computeCommitFile();
            committed = commitFile.exists() ? this.objectMapper.readValue(commitFile, Committed.class) : Committed.nil();
        }
        catch (Exception e) {
            throw new ISE(e, "Failed to read commitFile: %s", commitFile);
        }
        finally {
            this.commitLock.unlock();
        }
        log.info("Loading sinks from[%s]: %s", baseDir, committed.getHydrants().keySet());
        for (File sinkDir : files) {
            File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME);
            if (!identifierFile.isFile()) continue;
            try {
                SegmentIdentifier identifier = this.objectMapper.readValue(new File(sinkDir, IDENTIFIER_FILE_NAME), SegmentIdentifier.class);
                int committedHydrants = committed.getCommittedHydrants(identifier.getIdentifierAsString());
                if (committedHydrants <= 0) {
                    log.info("Removing uncommitted sink at [%s]", sinkDir);
                    FileUtils.deleteDirectory((File)sinkDir);
                    continue;
                }
                File[] sinkFiles = sinkDir.listFiles(new FilenameFilter(){

                    @Override
                    public boolean accept(File dir, String fileName) {
                        return Ints.tryParse(fileName) != null;
                    }
                });
                Arrays.sort(sinkFiles, new Comparator<File>(){

                    @Override
                    public int compare(File o1, File o2) {
                        return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName()));
                    }
                });
                ArrayList<FireHydrant> hydrants = Lists.newArrayList();
                for (File hydrantDir : sinkFiles) {
                    int hydrantNumber = Integer.parseInt(hydrantDir.getName());
                    if (hydrantNumber >= committedHydrants) {
                        log.info("Removing uncommitted segment at [%s]", hydrantDir);
                        FileUtils.deleteDirectory((File)hydrantDir);
                        continue;
                    }
                    log.info("Loading previously persisted segment at [%s]", hydrantDir);
                    if (hydrantNumber != hydrants.size()) {
                        throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", hydrants.size(), sinkDir);
                    }
                    hydrants.add(new FireHydrant(new QueryableIndexSegment(identifier.getIdentifierAsString(), this.indexIO.loadIndex(hydrantDir)), hydrantNumber));
                }
                if (committedHydrants != hydrants.size()) {
                    throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", hydrants.size(), sinkDir);
                }
                Sink currSink = new Sink(identifier.getInterval(), this.schema, identifier.getShardSpec(), identifier.getVersion(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.isReportParseExceptions(), hydrants);
                this.sinks.put(identifier, currSink);
                this.sinkTimeline.add(currSink.getInterval(), currSink.getVersion(), identifier.getShardSpec().createChunk(currSink));
                this.segmentAnnouncer.announceSegment(currSink.getSegment());
            }
            catch (IOException e) {
                log.makeAlert(e, "Problem loading sink[%s] from disk.", this.schema.getDataSource()).addData("sinkDir", sinkDir).emit();
            }
        }
        HashSet<String> loadedSinks = Sets.newHashSet(Iterables.transform(this.sinks.keySet(), new Function<SegmentIdentifier, String>(){

            @Override
            public String apply(SegmentIdentifier input) {
                return input.getIdentifierAsString();
            }
        }));
        Sets.SetView missingSinks = Sets.difference(committed.getHydrants().keySet(), loadedSinks);
        if (!missingSinks.isEmpty()) {
            throw new ISE("Missing committed sinks [%s]", Joiner.on(", ").join(missingSinks));
        }
        return committed.getMetadata();
    }

    private ListenableFuture<?> abandonSegment(final SegmentIdentifier identifier, final Sink sink, final boolean removeOnDiskData) {
        sink.finishWriting();
        this.droppingSinks.add(identifier);
        this.rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
        this.totalRows.addAndGet(-sink.getNumRows());
        return Futures.transform(this.pushBarrier(), new Function<Object, Object>(){

            @Override
            @Nullable
            public Object apply(@Nullable Object input) {
                if (AppenderatorImpl.this.sinks.get(identifier) != sink) {
                    log.warn("Sink for segment[%s] no longer valid, not abandoning.", identifier);
                    return null;
                }
                if (removeOnDiskData) {
                    log.info("Removing commit metadata for segment[%s].", identifier);
                    try {
                        AppenderatorImpl.this.commitLock.lock();
                        Committed oldCommit = AppenderatorImpl.this.readCommit();
                        if (oldCommit != null) {
                            AppenderatorImpl.this.writeCommit(oldCommit.without(identifier.getIdentifierAsString()));
                        }
                    }
                    catch (Exception e) {
                        log.makeAlert(e, "Failed to update committed segments[%s]", AppenderatorImpl.this.schema.getDataSource()).addData("identifier", identifier.getIdentifierAsString()).emit();
                        throw Throwables.propagate(e);
                    }
                    finally {
                        AppenderatorImpl.this.commitLock.unlock();
                    }
                }
                try {
                    AppenderatorImpl.this.segmentAnnouncer.unannounceSegment(sink.getSegment());
                }
                catch (Exception e) {
                    log.makeAlert(e, "Failed to unannounce segment[%s]", AppenderatorImpl.this.schema.getDataSource()).addData("identifier", identifier.getIdentifierAsString()).emit();
                }
                log.info("Removing sink for segment[%s].", identifier);
                AppenderatorImpl.this.sinks.remove(identifier);
                AppenderatorImpl.this.metrics.setSinkCount(AppenderatorImpl.this.sinks.size());
                AppenderatorImpl.this.droppingSinks.remove(identifier);
                AppenderatorImpl.this.sinkTimeline.remove(sink.getInterval(), sink.getVersion(), identifier.getShardSpec().createChunk(sink));
                for (FireHydrant hydrant : sink) {
                    if (AppenderatorImpl.this.cache != null) {
                        AppenderatorImpl.this.cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
                    }
                    hydrant.swapSegment(null);
                }
                if (removeOnDiskData) {
                    AppenderatorImpl.this.removeDirectory(AppenderatorImpl.this.computePersistDir(identifier));
                }
                return null;
            }
        }, this.persistExecutor);
    }

    private Committed readCommit() throws IOException {
        File commitFile = this.computeCommitFile();
        if (commitFile.exists()) {
            return this.objectMapper.readValue(commitFile, Committed.class);
        }
        return null;
    }

    private void writeCommit(Committed newCommit) throws IOException {
        File commitFile = this.computeCommitFile();
        this.objectMapper.writeValue(commitFile, (Object)newCommit);
    }

    private File computeCommitFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), "commit.json");
    }

    private File computeLockFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), ".lock");
    }

    private File computePersistDir(SegmentIdentifier identifier) {
        return new File(this.tuningConfig.getBasePersistDirectory(), identifier.getIdentifierAsString());
    }

    private File computeIdentifierFile(SegmentIdentifier identifier) {
        return new File(this.computePersistDir(identifier), IDENTIFIER_FILE_NAME);
    }

    private File computeDescriptorFile(SegmentIdentifier identifier) {
        return new File(this.computePersistDir(identifier), "descriptor.json");
    }

    private File createPersistDirIfNeeded(SegmentIdentifier identifier) throws IOException {
        File persistDir = this.computePersistDir(identifier);
        FileUtils.forceMkdir((File)persistDir);
        this.objectMapper.writeValue(this.computeIdentifierFile(identifier), (Object)identifier);
        return persistDir;
    }

    private int persistHydrant(FireHydrant indexToPersist, SegmentIdentifier identifier) {
        FireHydrant fireHydrant = indexToPersist;
        synchronized (fireHydrant) {
            if (indexToPersist.hasSwapped()) {
                log.info("Segment[%s], Hydrant[%s] already swapped. Ignoring request to persist.", identifier, indexToPersist);
                return 0;
            }
            log.info("Segment[%s], persisting Hydrant[%s]", identifier, indexToPersist);
            try {
                int numRows = indexToPersist.getIndex().size();
                File persistDir = this.createPersistDirIfNeeded(identifier);
                IndexSpec indexSpec = this.tuningConfig.getIndexSpec();
                File persistedFile = this.indexMerger.persist(indexToPersist.getIndex(), identifier.getInterval(), new File(persistDir, String.valueOf(indexToPersist.getCount())), indexSpec, this.tuningConfig.getSegmentWriteOutMediumFactory());
                indexToPersist.swapSegment(new QueryableIndexSegment(indexToPersist.getSegmentIdentifier(), this.indexIO.loadIndex(persistedFile)));
                return numRows;
            }
            catch (IOException e) {
                log.makeAlert("dataSource[%s] -- incremental persist failed", this.schema.getDataSource()).addData("segment", identifier.getIdentifierAsString()).addData("count", indexToPersist.getCount()).emit();
                throw Throwables.propagate(e);
            }
        }
    }

    private void removeDirectory(File target) {
        if (target.exists()) {
            try {
                log.info("Deleting Index File[%s]", target);
                FileUtils.deleteDirectory((File)target);
            }
            catch (Exception e) {
                log.makeAlert(e, "Failed to remove directory[%s]", this.schema.getDataSource()).addData("file", target).emit();
            }
        }
    }
}

