package org.apache.hadoop.hive.druid.io;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
import org.apache.hadoop.hive.druid.serde.DruidWritable;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Joiner;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Suppliers;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.FluentIterable;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.MapBasedInputRow;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.hive.druid.io.druid.segment.loading.DataSegmentPusher;
import org.apache.hive.druid.io.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderators;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentNotWritableException;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Committers;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.partition.LinearShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/druid/io/DruidRecordWriter.class */
public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>, FileSinkOperator.RecordWriter {
    protected static final Logger LOG = LoggerFactory.getLogger(DruidRecordWriter.class);
    private final DataSchema dataSchema;
    private final Appenderator appenderator;
    private final RealtimeTuningConfig tuningConfig;
    private final Path segmentsDescriptorDir;
    private SegmentIdentifier currentOpenSegment = null;
    private final Integer maxPartitionSize;
    private final FileSystem fileSystem;
    private final Supplier<Committer> committerSupplier;

    public DruidRecordWriter(DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, DataSegmentPusher dataSegmentPusher, int i, Path path, FileSystem fileSystem) {
        this.tuningConfig = (RealtimeTuningConfig) Preconditions.checkNotNull(realtimeTuningConfig.withBasePersistDirectory(new File(realtimeTuningConfig.getBasePersistDirectory(), UUID.randomUUID().toString())), "realtimeTuningConfig is null");
        this.dataSchema = (DataSchema) Preconditions.checkNotNull(dataSchema, "data schema is null");
        this.appenderator = Appenderators.createOffline(this.dataSchema, this.tuningConfig, new FireDepartmentMetrics(), dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO, DruidStorageHandlerUtils.INDEX_MERGER_V9);
        Preconditions.checkArgument(i > 0, "maxPartitionSize need to be greater than 0");
        this.maxPartitionSize = Integer.valueOf(i);
        this.appenderator.startJob();
        this.segmentsDescriptorDir = (Path) Preconditions.checkNotNull(path, "segmentsDescriptorsDir is null");
        this.fileSystem = (FileSystem) Preconditions.checkNotNull(fileSystem, "file system is null");
        this.committerSupplier = Suppliers.ofInstance(Committers.nil());
    }

    private SegmentIdentifier getSegmentIdentifierAndMaybePush(long j) {
        Interval interval = new Interval(new DateTime(j), this.dataSchema.getGranularitySpec().getSegmentGranularity().increment(new DateTime(j)));
        if (this.currentOpenSegment == null) {
            SegmentIdentifier segmentIdentifier = new SegmentIdentifier(this.dataSchema.getDataSource(), interval, this.tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(0));
            this.currentOpenSegment = segmentIdentifier;
            return segmentIdentifier;
        }
        if (!this.currentOpenSegment.getInterval().equals(interval)) {
            SegmentIdentifier segmentIdentifier2 = new SegmentIdentifier(this.dataSchema.getDataSource(), interval, this.tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(0));
            pushSegments(Lists.newArrayList(this.currentOpenSegment));
            LOG.info("Creating segment {}", segmentIdentifier2.getIdentifierAsString());
            this.currentOpenSegment = segmentIdentifier2;
            return segmentIdentifier2;
        }
        SegmentIdentifier segmentIdentifier3 = this.currentOpenSegment;
        if (this.appenderator.getRowCount(segmentIdentifier3) < this.maxPartitionSize.intValue()) {
            return segmentIdentifier3;
        }
        SegmentIdentifier segmentIdentifier4 = new SegmentIdentifier(this.dataSchema.getDataSource(), interval, this.tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(Integer.valueOf(this.currentOpenSegment.getShardSpec().getPartitionNum() + 1)));
        pushSegments(Lists.newArrayList(this.currentOpenSegment));
        LOG.info("Creating new partition for segment {}, partition num {}", segmentIdentifier4.getIdentifierAsString(), Integer.valueOf(segmentIdentifier4.getShardSpec().getPartitionNum()));
        this.currentOpenSegment = segmentIdentifier4;
        return segmentIdentifier4;
    }

    private void pushSegments(List<SegmentIdentifier> list) {
        try {
            SegmentsAndMetadata segmentsAndMetadata = this.appenderator.push(list, this.committerSupplier.get()).get();
            HashSet hashSet = new HashSet();
            for (DataSegment dataSegment : segmentsAndMetadata.getSegments()) {
                hashSet.add(SegmentIdentifier.fromDataSegment(dataSegment).getIdentifierAsString());
                Path makeSegmentDescriptorOutputPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, this.segmentsDescriptorDir);
                DruidStorageHandlerUtils.writeSegmentDescriptor(this.fileSystem, dataSegment, makeSegmentDescriptorOutputPath);
                LOG.info(String.format("Pushed the segment [%s] and persisted the descriptor located at [%s]", dataSegment, makeSegmentDescriptorOutputPath));
            }
            HashSet hashSet2 = new HashSet(FluentIterable.from(list).transform(new Function<SegmentIdentifier, String>() { // from class: org.apache.hadoop.hive.druid.io.DruidRecordWriter.1
                @Override // org.apache.hive.druid.com.google.common.base.Function
                @Nullable
                public String apply(@Nullable SegmentIdentifier segmentIdentifier) {
                    return segmentIdentifier.getIdentifierAsString();
                }
            }).toList());
            if (!hashSet.equals(hashSet2)) {
                throw new IllegalStateException(String.format("was asked to publish [%s] but was able to publish only [%s]", Joiner.on(", ").join(hashSet2), Joiner.on(", ").join(hashSet)));
            }
            for (SegmentIdentifier segmentIdentifier : list) {
                LOG.info("Dropping segment {}", segmentIdentifier.toString());
                this.appenderator.drop(segmentIdentifier).get();
            }
            LOG.info(String.format("Published [%,d] segments.", Integer.valueOf(list.size())));
        } catch (IOException | ExecutionException e) {
            LOG.error(String.format("Failed to push  [%,d] segments.", Integer.valueOf(list.size())), e);
            Throwables.propagate(e);
        } catch (InterruptedException e2) {
            LOG.error(String.format("got interrupted, failed to push  [%,d] segments.", Integer.valueOf(list.size())), e2);
            Thread.currentThread().interrupt();
        }
    }

    public void write(Writable writable) throws IOException {
        DruidWritable druidWritable = (DruidWritable) writable;
        long longValue = ((Long) druidWritable.getValue().get("__time")).longValue();
        long longValue2 = ((Long) druidWritable.getValue().get("__time_granularity")).longValue();
        try {
            this.appenderator.add(getSegmentIdentifierAndMaybePush(longValue2), new MapBasedInputRow(longValue, this.dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), druidWritable.getValue()), this.committerSupplier);
        } catch (SegmentNotWritableException e) {
            throw new IOException(e);
        }
    }

    public void close(boolean z) throws IOException {
        try {
            if (!z) {
                try {
                    ArrayList newArrayList = Lists.newArrayList();
                    newArrayList.addAll(this.appenderator.getSegments());
                    pushSegments(newArrayList);
                } catch (InterruptedException e) {
                    Throwables.propagate(e);
                    try {
                        FileUtils.deleteDirectory(this.tuningConfig.getBasePersistDirectory());
                    } catch (Exception e2) {
                        LOG.error("error cleaning of base persist directory", e2);
                    }
                    this.appenderator.close();
                    return;
                }
            }
            this.appenderator.clear();
        } finally {
            try {
                FileUtils.deleteDirectory(this.tuningConfig.getBasePersistDirectory());
            } catch (Exception e3) {
                LOG.error("error cleaning of base persist directory", e3);
            }
            this.appenderator.close();
        }
    }

    public void write(NullWritable nullWritable, DruidWritable druidWritable) throws IOException {
        write(druidWritable);
    }

    public void close(Reporter reporter) throws IOException {
        close(true);
    }
}
