package org.apache.hive.druid.io.druid.segment.realtime.appenderator;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.annotations.VisibleForTesting;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Joiner;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Stopwatch;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.primitives.Ints;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.druid.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.com.metamx.emitter.service.ServiceEmitter;
import org.apache.hive.druid.io.druid.client.cache.Cache;
import org.apache.hive.druid.io.druid.client.cache.CacheConfig;
import org.apache.hive.druid.io.druid.common.guava.ThreadRenamingCallable;
import org.apache.hive.druid.io.druid.concurrent.Execs;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.java.util.common.IAE;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.query.DruidMetrics;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.hive.druid.io.druid.query.QuerySegmentWalker;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;
import org.apache.hive.druid.io.druid.segment.IndexIO;
import org.apache.hive.druid.io.druid.segment.IndexMerger;
import org.apache.hive.druid.io.druid.segment.QueryableIndex;
import org.apache.hive.druid.io.druid.segment.QueryableIndexSegment;
import org.apache.hive.druid.io.druid.segment.incremental.IndexSizeExceededException;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.loading.DataSegmentPusher;
import org.apache.hive.druid.io.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.hive.druid.io.druid.segment.realtime.FireHydrant;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Sink;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/hive/druid/io/druid/segment/realtime/appenderator/AppenderatorImpl.class */
public class AppenderatorImpl implements Appenderator {
    private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class);
    private static final int WARN_DELAY = 1000;
    private static final String IDENTIFIER_FILE_NAME = "identifier.json";
    private final DataSchema schema;
    private final AppenderatorConfig tuningConfig;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentPusher dataSegmentPusher;
    private final ObjectMapper objectMapper;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final IndexIO indexIO;
    private final IndexMerger indexMerger;
    private final Cache cache;
    private final QuerySegmentWalker texasRanger;
    private volatile long nextFlush;
    private final Map<SegmentIdentifier, Sink> sinks = Maps.newConcurrentMap();
    private final Set<SegmentIdentifier> droppingSinks = Sets.newConcurrentHashSet();
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(String.CASE_INSENSITIVE_ORDER);
    private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
    private volatile ListeningExecutorService persistExecutor = null;
    private volatile ListeningExecutorService pushExecutor = null;
    private volatile FileLock basePersistDirLock = null;
    private volatile FileChannel basePersistDirLockChannel = null;

    public AppenderatorImpl(DataSchema dataSchema, AppenderatorConfig appenderatorConfig, FireDepartmentMetrics fireDepartmentMetrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, DataSegmentAnnouncer dataSegmentAnnouncer, ServiceEmitter serviceEmitter, ExecutorService executorService, IndexIO indexIO, IndexMerger indexMerger, Cache cache, CacheConfig cacheConfig) {
        this.schema = (DataSchema) Preconditions.checkNotNull(dataSchema, "schema");
        this.tuningConfig = (AppenderatorConfig) Preconditions.checkNotNull(appenderatorConfig, "tuningConfig");
        this.metrics = (FireDepartmentMetrics) Preconditions.checkNotNull(fireDepartmentMetrics, "metrics");
        this.dataSegmentPusher = (DataSegmentPusher) Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher");
        this.objectMapper = (ObjectMapper) Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.segmentAnnouncer = (DataSegmentAnnouncer) Preconditions.checkNotNull(dataSegmentAnnouncer, "segmentAnnouncer");
        this.indexIO = (IndexIO) Preconditions.checkNotNull(indexIO, "indexIO");
        this.indexMerger = (IndexMerger) Preconditions.checkNotNull(indexMerger, "indexMerger");
        this.cache = cache;
        this.texasRanger = queryRunnerFactoryConglomerate == null ? null : new SinkQuerySegmentWalker(dataSchema.getDataSource(), this.sinkTimeline, objectMapper, serviceEmitter, queryRunnerFactoryConglomerate, executorService, (Cache) Preconditions.checkNotNull(cache, "cache"), cacheConfig);
        log.info("Created Appenderator for dataSource[%s].", dataSchema.getDataSource());
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public String getDataSource() {
        return this.schema.getDataSource();
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public Object startJob() {
        this.tuningConfig.getBasePersistDirectory().mkdirs();
        lockBasePersistDirectory();
        Object bootstrapSinksFromDisk = bootstrapSinksFromDisk();
        initializeExecutors();
        resetNextFlush();
        return bootstrapSinksFromDisk;
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public int add(SegmentIdentifier segmentIdentifier, InputRow inputRow, Supplier<Committer> supplier) throws IndexSizeExceededException, SegmentNotWritableException {
        if (!segmentIdentifier.getDataSource().equals(this.schema.getDataSource())) {
            throw new IAE("Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", this.schema.getDataSource(), segmentIdentifier.getDataSource());
        }
        Sink orCreateSink = getOrCreateSink(segmentIdentifier);
        this.metrics.reportMessageMaxTimestamp(inputRow.getTimestampFromEpoch());
        int numRowsInMemory = orCreateSink.getNumRowsInMemory();
        try {
            int add = orCreateSink.add(inputRow);
            if (add < 0) {
                throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", segmentIdentifier);
            }
            this.rowsCurrentlyInMemory.addAndGet(add - numRowsInMemory);
            if (!orCreateSink.canAppendRow() || System.currentTimeMillis() > this.nextFlush || this.rowsCurrentlyInMemory.get() >= this.tuningConfig.getMaxRowsInMemory()) {
                persistAll(supplier.get2());
            }
            return orCreateSink.getNumRows();
        } catch (IndexSizeExceededException e) {
            log.error(e, "Sink for segment[%s] was unexpectedly full!", segmentIdentifier);
            throw e;
        }
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public List<SegmentIdentifier> getSegments() {
        return ImmutableList.copyOf((Collection) this.sinks.keySet());
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public int getRowCount(SegmentIdentifier segmentIdentifier) {
        Sink sink = this.sinks.get(segmentIdentifier);
        if (sink == null) {
            throw new ISE("No such sink: %s", segmentIdentifier);
        }
        return sink.getNumRows();
    }

    @VisibleForTesting
    int getRowsInMemory() {
        return this.rowsCurrentlyInMemory.get();
    }

    private Sink getOrCreateSink(SegmentIdentifier segmentIdentifier) {
        Sink sink = this.sinks.get(segmentIdentifier);
        if (sink == null) {
            sink = new Sink(segmentIdentifier.getInterval(), this.schema, segmentIdentifier.getShardSpec(), segmentIdentifier.getVersion(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.isReportParseExceptions());
            try {
                this.segmentAnnouncer.announceSegment(sink.getSegment());
            } catch (IOException e) {
                log.makeAlert(e, "Failed to announce new segment[%s]", this.schema.getDataSource()).addData(DruidMetrics.INTERVAL, sink.getInterval()).emit();
            }
            this.sinks.put(segmentIdentifier, sink);
            this.metrics.setSinkCount(this.sinks.size());
            this.sinkTimeline.add(sink.getInterval(), (Interval) sink.getVersion(), segmentIdentifier.getShardSpec().createChunk(sink));
        }
        return sink;
    }

    @Override // org.apache.hive.druid.io.druid.query.QuerySegmentWalker
    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForIntervals(query, iterable);
    }

    @Override // org.apache.hive.druid.io.druid.query.QuerySegmentWalker
    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> iterable) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForSegments(query, iterable);
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public void clear() throws InterruptedException {
        try {
            this.persistExecutor.submit((Callable) new Callable<Object>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.1
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    AppenderatorImpl.this.objectMapper.writeValue(AppenderatorImpl.this.computeCommitFile(), Committed.nil());
                    return null;
                }
            }).get();
            ArrayList newArrayList = Lists.newArrayList();
            for (Map.Entry<SegmentIdentifier, Sink> entry : this.sinks.entrySet()) {
                newArrayList.add(abandonSegment(entry.getKey(), entry.getValue(), true));
            }
            Futures.allAsList(newArrayList).get();
        } catch (ExecutionException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<?> drop(SegmentIdentifier segmentIdentifier) {
        Sink sink = this.sinks.get(segmentIdentifier);
        return sink != null ? abandonSegment(segmentIdentifier, sink, true) : Futures.immediateFuture(null);
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<Object> persistAll(final Committer committer) {
        final HashMap newHashMap = Maps.newHashMap();
        final ArrayList newArrayList = Lists.newArrayList();
        for (SegmentIdentifier segmentIdentifier : this.sinks.keySet()) {
            Sink sink = this.sinks.get(segmentIdentifier);
            ArrayList newArrayList2 = Lists.newArrayList(sink);
            newHashMap.put(segmentIdentifier, Integer.valueOf(newArrayList2.size()));
            for (FireHydrant fireHydrant : newArrayList2.subList(0, sink.isWritable() ? newArrayList2.size() - 1 : newArrayList2.size())) {
                if (!fireHydrant.hasSwapped()) {
                    log.info("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", fireHydrant, segmentIdentifier);
                    newArrayList.add(Pair.of(fireHydrant, segmentIdentifier));
                }
            }
            if (sink.swappable()) {
                newArrayList.add(Pair.of(sink.swap(), segmentIdentifier));
            }
        }
        log.info("Submitting persist runnable for dataSource[%s]", this.schema.getDataSource());
        String format = String.format("%s-incremental-persist", this.schema.getDataSource());
        final Object metadata = committer.getMetadata();
        Stopwatch createStarted = Stopwatch.createStarted();
        final Stopwatch createStarted2 = Stopwatch.createStarted();
        ListenableFuture<Object> submit = this.persistExecutor.submit((Callable) new ThreadRenamingCallable<Object>(format) { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.hive.druid.io.druid.common.guava.ThreadRenamingCallable
            public Object doCall() {
                try {
                    try {
                        for (Pair pair : newArrayList) {
                            AppenderatorImpl.this.metrics.incrementRowOutputCount(AppenderatorImpl.this.persistHydrant((FireHydrant) pair.lhs, (SegmentIdentifier) pair.rhs));
                        }
                        AppenderatorImpl.log.info("Committing metadata[%s] for sinks[%s].", metadata, Joiner.on(", ").join(Iterables.transform(newHashMap.entrySet(), new Function<Map.Entry<SegmentIdentifier, Integer>, String>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.2.1
                            @Override // org.apache.hive.druid.com.google.common.base.Function
                            public String apply(Map.Entry<SegmentIdentifier, Integer> entry) {
                                return String.format("%s:%d", entry.getKey().getIdentifierAsString(), entry.getValue());
                            }
                        })));
                        committer.run();
                        AppenderatorImpl.this.objectMapper.writeValue(AppenderatorImpl.this.computeCommitFile(), Committed.create(newHashMap, metadata));
                        Object obj = metadata;
                        AppenderatorImpl.this.metrics.incrementNumPersists();
                        AppenderatorImpl.this.metrics.incrementPersistTimeMillis(createStarted2.elapsed(TimeUnit.MILLISECONDS));
                        createStarted2.stop();
                        return obj;
                    } catch (Exception e) {
                        AppenderatorImpl.this.metrics.incrementFailedPersists();
                        throw Throwables.propagate(e);
                    }
                } catch (Throwable th) {
                    AppenderatorImpl.this.metrics.incrementNumPersists();
                    AppenderatorImpl.this.metrics.incrementPersistTimeMillis(createStarted2.elapsed(TimeUnit.MILLISECONDS));
                    createStarted2.stop();
                    throw th;
                }
            }
        });
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(elapsed);
        if (elapsed > 1000) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", Long.valueOf(elapsed));
        }
        createStarted.stop();
        resetNextFlush();
        this.rowsCurrentlyInMemory.set(0);
        return submit;
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<SegmentsAndMetadata> push(List<SegmentIdentifier> list, Committer committer) {
        final HashMap newHashMap = Maps.newHashMap();
        for (SegmentIdentifier segmentIdentifier : list) {
            Sink sink = this.sinks.get(segmentIdentifier);
            if (sink == null) {
                throw new NullPointerException("No sink for identifier: " + segmentIdentifier);
            }
            newHashMap.put(segmentIdentifier, sink);
            sink.finishWriting();
        }
        return Futures.transform(persistAll(committer), new Function<Object, SegmentsAndMetadata>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.hive.druid.com.google.common.base.Function
            public SegmentsAndMetadata apply(Object obj) {
                ArrayList newArrayList = Lists.newArrayList();
                for (Map.Entry entry : newHashMap.entrySet()) {
                    if (AppenderatorImpl.this.droppingSinks.contains(entry.getKey())) {
                        AppenderatorImpl.log.info("Skipping push of currently-dropping sink[%s]", entry.getKey());
                    } else {
                        DataSegment mergeAndPush = AppenderatorImpl.this.mergeAndPush((SegmentIdentifier) entry.getKey(), (Sink) entry.getValue());
                        if (mergeAndPush != null) {
                            newArrayList.add(mergeAndPush);
                        } else {
                            AppenderatorImpl.log.warn("mergeAndPush[%s] returned null, skipping.", entry.getKey());
                        }
                    }
                }
                return new SegmentsAndMetadata(newArrayList, obj);
            }
        }, this.pushExecutor);
    }

    private ListenableFuture<?> pushBarrier() {
        return this.pushExecutor.submit(new Runnable() { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.4
            @Override // java.lang.Runnable
            public void run() {
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataSegment mergeAndPush(SegmentIdentifier segmentIdentifier, Sink sink) {
        if (this.sinks.get(segmentIdentifier) != sink) {
            log.warn("Sink for segment[%s] no longer valid, bailing out of mergeAndPush.", segmentIdentifier);
            return null;
        }
        File file = new File(computePersistDir(segmentIdentifier), "merged");
        File computeDescriptorFile = computeDescriptorFile(segmentIdentifier);
        Iterator<FireHydrant> it2 = sink.iterator();
        while (it2.hasNext()) {
            FireHydrant next = it2.next();
            if (sink.isWritable()) {
                throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", segmentIdentifier);
            }
            synchronized (next) {
                if (!next.hasSwapped()) {
                    throw new ISE("WTF?! Expected sink to be fully persisted before mergeAndPush. Segment[%s].", segmentIdentifier);
                }
            }
        }
        try {
            if (computeDescriptorFile.exists()) {
                log.info("Segment[%s] already pushed.", segmentIdentifier);
                return (DataSegment) this.objectMapper.readValue(computeDescriptorFile, DataSegment.class);
            }
            log.info("Pushing merged index for segment[%s].", segmentIdentifier);
            removeDirectory(file);
            if (file.exists()) {
                throw new ISE("Merged target[%s] exists after removing?!", file);
            }
            ArrayList newArrayList = Lists.newArrayList();
            Iterator<FireHydrant> it3 = sink.iterator();
            while (it3.hasNext()) {
                FireHydrant next2 = it3.next();
                QueryableIndex asQueryableIndex = next2.getSegment().asQueryableIndex();
                log.info("Adding hydrant[%s]", next2);
                newArrayList.add(asQueryableIndex);
            }
            File mergeQueryableIndex = this.indexMerger.mergeQueryableIndex(newArrayList, this.schema.getGranularitySpec().isRollup(), this.schema.getAggregators(), file, this.tuningConfig.getIndexSpec());
            DataSegment push = this.dataSegmentPusher.push(mergeQueryableIndex, sink.getSegment().withDimensions(Lists.newArrayList(this.indexIO.loadIndex(mergeQueryableIndex).getAvailableDimensions())));
            this.objectMapper.writeValue(computeDescriptorFile, push);
            log.info("Pushed merged index for segment[%s], descriptor is: %s", segmentIdentifier, push);
            return push;
        } catch (Exception e) {
            this.metrics.incrementFailedHandoffs();
            log.warn(e, "Failed to push merged index for segment[%s].", segmentIdentifier);
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.info("Shutting down...", new Object[0]);
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<SegmentIdentifier, Sink> entry : this.sinks.entrySet()) {
            newArrayList.add(abandonSegment(entry.getKey(), entry.getValue(), false));
        }
        try {
            Futures.allAsList(newArrayList).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn(e, "Interrupted during close()", new Object[0]);
        } catch (ExecutionException e2) {
            log.warn(e2, "Unable to abandon existing segments during close()", new Object[0]);
        }
        try {
            shutdownExecutors();
            Preconditions.checkState(this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS), "persistExecutor not terminated");
            Preconditions.checkState(this.pushExecutor.awaitTermination(365L, TimeUnit.DAYS), "pushExecutor not terminated");
            unlockBasePersistDirectory();
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
    }

    private void lockBasePersistDirectory() {
        if (this.basePersistDirLock == null) {
            try {
                this.basePersistDirLockChannel = FileChannel.open(computeLockFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
                this.basePersistDirLock = this.basePersistDirLockChannel.tryLock();
                if (this.basePersistDirLock == null) {
                    throw new ISE("Cannot acquire lock on basePersistDir: %s", computeLockFile());
                }
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    }

    private void unlockBasePersistDirectory() {
        try {
            if (this.basePersistDirLock != null) {
                this.basePersistDirLock.release();
                this.basePersistDirLockChannel.close();
                this.basePersistDirLock = null;
            }
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private void initializeExecutors() {
        int maxPendingPersists = this.tuningConfig.getMaxPendingPersists();
        if (this.persistExecutor == null) {
            this.persistExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("appenderator_persist_%d", maxPendingPersists));
        }
        if (this.pushExecutor == null) {
            this.pushExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("appenderator_merge_%d", 1));
        }
    }

    private void shutdownExecutors() {
        this.persistExecutor.shutdownNow();
        this.pushExecutor.shutdownNow();
    }

    private void resetNextFlush() {
        this.nextFlush = new DateTime().plus(this.tuningConfig.getIntermediatePersistPeriod()).getMillis();
    }

    private Object bootstrapSinksFromDisk() {
        File[] listFiles;
        Preconditions.checkState(this.sinks.isEmpty(), "Already bootstrapped?!");
        File basePersistDirectory = this.tuningConfig.getBasePersistDirectory();
        if (!basePersistDirectory.exists() || (listFiles = basePersistDirectory.listFiles()) == null) {
            return null;
        }
        File computeCommitFile = computeCommitFile();
        try {
            Committed nil = computeCommitFile.exists() ? (Committed) this.objectMapper.readValue(computeCommitFile, Committed.class) : Committed.nil();
            log.info("Loading sinks from[%s]: %s", basePersistDirectory, nil.getHydrants().keySet());
            for (File file : listFiles) {
                if (new File(file, IDENTIFIER_FILE_NAME).isFile()) {
                    try {
                        SegmentIdentifier segmentIdentifier = (SegmentIdentifier) this.objectMapper.readValue(new File(file, IDENTIFIER_FILE_NAME), SegmentIdentifier.class);
                        int committedHydrants = nil.getCommittedHydrants(segmentIdentifier.getIdentifierAsString());
                        if (committedHydrants <= 0) {
                            log.info("Removing uncommitted sink at [%s]", file);
                            FileUtils.deleteDirectory(file);
                        } else {
                            File[] listFiles2 = file.listFiles(new FilenameFilter() { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.5
                                @Override // java.io.FilenameFilter
                                public boolean accept(File file2, String str) {
                                    return Ints.tryParse(str) != null;
                                }
                            });
                            Arrays.sort(listFiles2, new Comparator<File>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.6
                                @Override // java.util.Comparator
                                public int compare(File file2, File file3) {
                                    return Ints.compare(Integer.parseInt(file2.getName()), Integer.parseInt(file3.getName()));
                                }
                            });
                            ArrayList newArrayList = Lists.newArrayList();
                            for (File file2 : listFiles2) {
                                int parseInt = Integer.parseInt(file2.getName());
                                if (parseInt >= committedHydrants) {
                                    log.info("Removing uncommitted segment at [%s]", file2);
                                    FileUtils.deleteDirectory(file2);
                                } else {
                                    log.info("Loading previously persisted segment at [%s]", file2);
                                    if (parseInt != newArrayList.size()) {
                                        throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", Integer.valueOf(newArrayList.size()), file);
                                    }
                                    newArrayList.add(new FireHydrant(new QueryableIndexSegment(segmentIdentifier.getIdentifierAsString(), this.indexIO.loadIndex(file2)), parseInt));
                                }
                            }
                            if (committedHydrants != newArrayList.size()) {
                                throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", Integer.valueOf(newArrayList.size()), file);
                            }
                            Sink sink = new Sink(segmentIdentifier.getInterval(), this.schema, segmentIdentifier.getShardSpec(), segmentIdentifier.getVersion(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.isReportParseExceptions(), newArrayList);
                            this.sinks.put(segmentIdentifier, sink);
                            this.sinkTimeline.add(sink.getInterval(), (Interval) sink.getVersion(), segmentIdentifier.getShardSpec().createChunk(sink));
                            this.segmentAnnouncer.announceSegment(sink.getSegment());
                        }
                    } catch (IOException e) {
                        log.makeAlert(e, "Problem loading sink[%s] from disk.", this.schema.getDataSource()).addData("sinkDir", file).emit();
                    }
                }
            }
            Sets.SetView difference = Sets.difference(nil.getHydrants().keySet(), Sets.newHashSet(Iterables.transform(this.sinks.keySet(), new Function<SegmentIdentifier, String>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.7
                @Override // org.apache.hive.druid.com.google.common.base.Function
                public String apply(SegmentIdentifier segmentIdentifier2) {
                    return segmentIdentifier2.getIdentifierAsString();
                }
            })));
            if (difference.isEmpty()) {
                return nil.getMetadata();
            }
            throw new ISE("Missing committed sinks [%s]", Joiner.on(", ").join(difference));
        } catch (Exception e2) {
            throw new ISE(e2, "Failed to read commitFile: %s", computeCommitFile);
        }
    }

    private ListenableFuture<?> abandonSegment(final SegmentIdentifier segmentIdentifier, final Sink sink, final boolean z) {
        sink.finishWriting();
        this.droppingSinks.add(segmentIdentifier);
        this.rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
        return Futures.transform(pushBarrier(), new Function<Object, Object>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorImpl.8
            @Override // org.apache.hive.druid.com.google.common.base.Function
            @Nullable
            public Object apply(@Nullable Object obj) {
                if (AppenderatorImpl.this.sinks.get(segmentIdentifier) != sink) {
                    AppenderatorImpl.log.warn("Sink for segment[%s] no longer valid, not abandoning.", new Object[0]);
                    return null;
                }
                if (z) {
                    AppenderatorImpl.log.info("Removing commit metadata for segment[%s].", segmentIdentifier);
                    try {
                        File computeCommitFile = AppenderatorImpl.this.computeCommitFile();
                        if (computeCommitFile.exists()) {
                            AppenderatorImpl.this.objectMapper.writeValue(computeCommitFile, ((Committed) AppenderatorImpl.this.objectMapper.readValue(computeCommitFile, Committed.class)).without(segmentIdentifier.getIdentifierAsString()));
                        }
                    } catch (Exception e) {
                        AppenderatorImpl.log.makeAlert(e, "Failed to update committed segments[%s]", AppenderatorImpl.this.schema.getDataSource()).addData("identifier", segmentIdentifier.getIdentifierAsString()).emit();
                        throw Throwables.propagate(e);
                    }
                }
                try {
                    AppenderatorImpl.this.segmentAnnouncer.unannounceSegment(sink.getSegment());
                } catch (Exception e2) {
                    AppenderatorImpl.log.makeAlert(e2, "Failed to unannounce segment[%s]", AppenderatorImpl.this.schema.getDataSource()).addData("identifier", segmentIdentifier.getIdentifierAsString()).emit();
                }
                AppenderatorImpl.log.info("Removing sink for segment[%s].", segmentIdentifier);
                AppenderatorImpl.this.sinks.remove(segmentIdentifier);
                AppenderatorImpl.this.metrics.setSinkCount(AppenderatorImpl.this.sinks.size());
                AppenderatorImpl.this.droppingSinks.remove(segmentIdentifier);
                AppenderatorImpl.this.sinkTimeline.remove(sink.getInterval(), (Interval) sink.getVersion(), segmentIdentifier.getShardSpec().createChunk(sink));
                Iterator<FireHydrant> it2 = sink.iterator();
                while (it2.hasNext()) {
                    FireHydrant next = it2.next();
                    if (AppenderatorImpl.this.cache != null) {
                        AppenderatorImpl.this.cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(next));
                    }
                }
                if (!z) {
                    return null;
                }
                AppenderatorImpl.this.removeDirectory(AppenderatorImpl.this.computePersistDir(segmentIdentifier));
                return null;
            }
        }, this.persistExecutor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public File computeCommitFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), "commit.json");
    }

    private File computeLockFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), ".lock");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public File computePersistDir(SegmentIdentifier segmentIdentifier) {
        return new File(this.tuningConfig.getBasePersistDirectory(), segmentIdentifier.getIdentifierAsString());
    }

    private File computeIdentifierFile(SegmentIdentifier segmentIdentifier) {
        return new File(computePersistDir(segmentIdentifier), IDENTIFIER_FILE_NAME);
    }

    private File computeDescriptorFile(SegmentIdentifier segmentIdentifier) {
        return new File(computePersistDir(segmentIdentifier), "descriptor.json");
    }

    private File createPersistDirIfNeeded(SegmentIdentifier segmentIdentifier) throws IOException {
        File computePersistDir = computePersistDir(segmentIdentifier);
        if (!computePersistDir.mkdir() && !computePersistDir.exists()) {
            throw new IOException(String.format("Could not create directory: %s", computePersistDir));
        }
        this.objectMapper.writeValue(computeIdentifierFile(segmentIdentifier), segmentIdentifier);
        return computePersistDir;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int persistHydrant(FireHydrant fireHydrant, SegmentIdentifier segmentIdentifier) {
        synchronized (fireHydrant) {
            if (fireHydrant.hasSwapped()) {
                log.info("Segment[%s], Hydrant[%s] already swapped. Ignoring request to persist.", segmentIdentifier, fireHydrant);
                return 0;
            }
            log.info("Segment[%s], persisting Hydrant[%s]", segmentIdentifier, fireHydrant);
            try {
                int size = fireHydrant.getIndex().size();
                fireHydrant.swapSegment(new QueryableIndexSegment(fireHydrant.getSegment().getIdentifier(), this.indexIO.loadIndex(this.indexMerger.persist(fireHydrant.getIndex(), segmentIdentifier.getInterval(), new File(createPersistDirIfNeeded(segmentIdentifier), String.valueOf(fireHydrant.getCount())), this.tuningConfig.getIndexSpec()))));
                return size;
            } catch (IOException e) {
                log.makeAlert("dataSource[%s] -- incremental persist failed", this.schema.getDataSource()).addData("segment", segmentIdentifier.getIdentifierAsString()).addData("count", Integer.valueOf(fireHydrant.getCount())).emit();
                throw Throwables.propagate(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeDirectory(File file) {
        if (file.exists()) {
            try {
                log.info("Deleting Index File[%s]", file);
                FileUtils.deleteDirectory(file);
            } catch (Exception e) {
                log.makeAlert(e, "Failed to remove directory[%s]", this.schema.getDataSource()).addData("file", file).emit();
            }
        }
    }
}
