package org.apache.hive.druid.io.druid.segment.realtime;

import com.google.inject.Inject;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.Firehose;
import org.apache.hive.druid.io.druid.data.input.FirehoseV2;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.java.util.common.guava.CloseQuietly;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.hive.druid.io.druid.query.FinalizeResultsQueryRunner;
import org.apache.hive.druid.io.druid.query.NoopQueryRunner;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactory;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.hive.druid.io.druid.query.QuerySegmentWalker;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;
import org.apache.hive.druid.io.druid.query.spec.SpecificSegmentSpec;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Committers;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Plumber;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Plumbers;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/hive/druid/io/druid/segment/realtime/RealtimeManager.class */
public class RealtimeManager implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(RealtimeManager.class);
    private final List<FireDepartment> fireDepartments;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final Map<String, Map<Integer, FireChief>> chiefs;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hive/druid/io/druid/segment/realtime/RealtimeManager$FireChief.class */
    public static class FireChief extends Thread implements Closeable {
        private final FireDepartment fireDepartment;
        private final FireDepartmentMetrics metrics;
        private final RealtimeTuningConfig config;
        private final QueryRunnerFactoryConglomerate conglomerate;
        private volatile Firehose firehose = null;
        private volatile FirehoseV2 firehoseV2 = null;
        private volatile Plumber plumber = null;
        private volatile boolean normalExit = true;

        public FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate) {
            this.fireDepartment = fireDepartment;
            this.conglomerate = queryRunnerFactoryConglomerate;
            this.config = fireDepartment.getTuningConfig();
            this.metrics = fireDepartment.getMetrics();
        }

        public Firehose initFirehose() {
            Firehose firehose;
            synchronized (this) {
                if (this.firehose == null) {
                    try {
                        RealtimeManager.log.info("Calling the FireDepartment and getting a Firehose.", new Object[0]);
                        this.firehose = this.fireDepartment.connect();
                        RealtimeManager.log.info("Firehose acquired!", new Object[0]);
                    } catch (IOException e) {
                        throw Throwables.propagate(e);
                    }
                } else {
                    RealtimeManager.log.warn("Firehose already connected, skipping initFirehose().", new Object[0]);
                }
                firehose = this.firehose;
            }
            return firehose;
        }

        public FirehoseV2 initFirehoseV2(Object obj) {
            FirehoseV2 firehoseV2;
            synchronized (this) {
                if (this.firehoseV2 == null) {
                    try {
                        RealtimeManager.log.info("Calling the FireDepartment and getting a FirehoseV2.", new Object[0]);
                        this.firehoseV2 = this.fireDepartment.connect(obj);
                        RealtimeManager.log.info("FirehoseV2 acquired!", new Object[0]);
                    } catch (IOException e) {
                        throw Throwables.propagate(e);
                    }
                } else {
                    RealtimeManager.log.warn("FirehoseV2 already connected, skipping initFirehoseV2().", new Object[0]);
                }
                firehoseV2 = this.firehoseV2;
            }
            return firehoseV2;
        }

        public Plumber initPlumber() {
            Plumber plumber;
            synchronized (this) {
                if (this.plumber == null) {
                    RealtimeManager.log.info("Someone get us a plumber!", new Object[0]);
                    this.plumber = this.fireDepartment.findPlumber();
                    RealtimeManager.log.info("We have our plumber!", new Object[0]);
                } else {
                    RealtimeManager.log.warn("Plumber already trained, skipping initPlumber().", new Object[0]);
                }
                plumber = this.plumber;
            }
            return plumber;
        }

        public FireDepartmentMetrics getMetrics() {
            return this.metrics;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.plumber = initPlumber();
            try {
                try {
                    Object startJob = this.plumber.startJob();
                    if (this.fireDepartment.checkFirehoseV2()) {
                        this.firehoseV2 = initFirehoseV2(startJob);
                        runFirehoseV2(this.firehoseV2);
                    } else {
                        this.firehose = initFirehose();
                        runFirehose(this.firehose);
                    }
                } catch (Error e) {
                    RealtimeManager.log.makeAlert(e, "Exception aborted realtime processing[%s]", this.fireDepartment.getDataSchema().getDataSource()).emit();
                    this.normalExit = false;
                    throw e;
                } catch (RuntimeException e2) {
                    RealtimeManager.log.makeAlert(e2, "RuntimeException aborted realtime processing[%s]", this.fireDepartment.getDataSchema().getDataSource()).emit();
                    this.normalExit = false;
                    throw e2;
                }
            } finally {
                CloseQuietly.close(this.firehose);
                if (this.normalExit) {
                    this.plumber.finishJob();
                    this.plumber = null;
                    this.firehose = null;
                }
            }
        }

        private void runFirehoseV2(FirehoseV2 firehoseV2) {
            try {
                firehoseV2.start();
                RealtimeManager.log.info("FirehoseV2 started", new Object[0]);
                Supplier<Committer> supplierFromFirehoseV2 = Committers.supplierFromFirehoseV2(firehoseV2);
                boolean z = true;
                while (z) {
                    InputRow inputRow = null;
                    try {
                        inputRow = firehoseV2.currRow();
                        if (inputRow == null) {
                            RealtimeManager.log.debug("thrown away null input row, considering unparseable", new Object[0]);
                            this.metrics.incrementUnparseable();
                        } else if (this.plumber.add(inputRow, supplierFromFirehoseV2) < 0) {
                            this.metrics.incrementThrownAway();
                            RealtimeManager.log.debug("Throwing away event[%s]", inputRow);
                        } else {
                            this.metrics.incrementProcessed();
                        }
                    } catch (Exception e) {
                        RealtimeManager.log.makeAlert(e, "Unknown exception, Ignoring and continuing.", new Object[0]).addData("inputRow", inputRow);
                    }
                    try {
                        z = firehoseV2.advance();
                    } catch (Exception e2) {
                        RealtimeManager.log.debug(e2, "exception in firehose.advance(), considering unparseable row", new Object[0]);
                        this.metrics.incrementUnparseable();
                    }
                }
            } catch (Exception e3) {
                RealtimeManager.log.error(e3, "Failed to start firehoseV2", new Object[0]);
            }
        }

        private void runFirehose(Firehose firehose) {
            Supplier<Committer> supplierFromFirehose = Committers.supplierFromFirehose(firehose);
            while (firehose.hasMore()) {
                Plumbers.addNextRow(supplierFromFirehose, firehose, this.plumber, this.config.isReportParseExceptions(), this.metrics);
            }
        }

        public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
            return new FinalizeResultsQueryRunner(this.plumber.getQueryRunner(query), this.conglomerate.findFactory(query).getToolchest());
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            synchronized (this) {
                if (this.firehose != null) {
                    this.normalExit = false;
                    this.firehose.close();
                }
            }
        }
    }

    @Inject
    public RealtimeManager(List<FireDepartment> list, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate) {
        this.fireDepartments = list;
        this.conglomerate = queryRunnerFactoryConglomerate;
        this.chiefs = Maps.newHashMap();
    }

    RealtimeManager(List<FireDepartment> list, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, Map<String, Map<Integer, FireChief>> map) {
        this.fireDepartments = list;
        this.conglomerate = queryRunnerFactoryConglomerate;
        this.chiefs = map;
    }

    @LifecycleStart
    public void start() throws IOException {
        for (FireDepartment fireDepartment : this.fireDepartments) {
            DataSchema dataSchema = fireDepartment.getDataSchema();
            FireChief fireChief = new FireChief(fireDepartment, this.conglomerate);
            Map<Integer, FireChief> map = this.chiefs.get(dataSchema.getDataSource());
            if (map == null) {
                map = new HashMap();
                this.chiefs.put(dataSchema.getDataSource(), map);
            }
            map.put(Integer.valueOf(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum()), fireChief);
            fireChief.setName(String.format("chief-%s[%s]", dataSchema.getDataSource(), Integer.valueOf(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum())));
            fireChief.setDaemon(true);
            fireChief.start();
        }
    }

    @LifecycleStop
    public void stop() {
        Iterator<Map<Integer, FireChief>> it2 = this.chiefs.values().iterator();
        while (it2.hasNext()) {
            Iterator<FireChief> it3 = it2.next().values().iterator();
            while (it3.hasNext()) {
                CloseQuietly.close(it3.next());
            }
        }
    }

    public FireDepartmentMetrics getMetrics(String str) {
        Map<Integer, FireChief> map = this.chiefs.get(str);
        if (map == null) {
            return null;
        }
        FireDepartmentMetrics fireDepartmentMetrics = null;
        for (FireChief fireChief : map.values()) {
            if (fireDepartmentMetrics == null) {
                fireDepartmentMetrics = fireChief.getMetrics().snapshot();
            } else {
                fireDepartmentMetrics.merge(fireChief.getMetrics());
            }
        }
        return fireDepartmentMetrics;
    }

    @Override // org.apache.hive.druid.io.druid.query.QuerySegmentWalker
    public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> iterable) {
        QueryRunnerFactory findFactory = this.conglomerate.findFactory(query);
        Map<Integer, FireChief> map = this.chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames()));
        return map == null ? new NoopQueryRunner() : findFactory.getToolchest().mergeResults(findFactory.mergeRunners(MoreExecutors.sameThreadExecutor(), Iterables.transform(map.values(), new Function<FireChief, QueryRunner<T>>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.RealtimeManager.1
            @Override // org.apache.hive.druid.com.google.common.base.Function, java.util.function.Function
            public QueryRunner<T> apply(FireChief fireChief) {
                return fireChief.getQueryRunner(query);
            }
        })));
    }

    @Override // org.apache.hive.druid.io.druid.query.QuerySegmentWalker
    public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> iterable) {
        QueryRunnerFactory findFactory = this.conglomerate.findFactory(query);
        final Map<Integer, FireChief> map = this.chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames()));
        return map == null ? new NoopQueryRunner() : findFactory.getToolchest().mergeResults(findFactory.mergeRunners(MoreExecutors.sameThreadExecutor(), Iterables.transform(iterable, new Function<SegmentDescriptor, QueryRunner<T>>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.RealtimeManager.2
            @Override // org.apache.hive.druid.com.google.common.base.Function, java.util.function.Function
            public QueryRunner<T> apply(SegmentDescriptor segmentDescriptor) {
                FireChief fireChief = (FireChief) map.get(Integer.valueOf(segmentDescriptor.getPartitionNumber()));
                return fireChief == null ? new NoopQueryRunner() : fireChief.getQueryRunner(query.withQuerySegmentSpec(new SpecificSegmentSpec(segmentDescriptor)));
            }
        })));
    }
}
