/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.druid.io;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
import org.apache.hadoop.hive.druid.serde.DruidWritable;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Joiner;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Suppliers;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.FluentIterable;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.MapBasedInputRow;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.granularity.Granularity;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.hive.druid.io.druid.segment.loading.DataSegmentPusher;
import org.apache.hive.druid.io.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderators;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentNotWritableException;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Committers;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.partition.LinearShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DruidRecordWriter
implements RecordWriter<NullWritable, DruidWritable>,
FileSinkOperator.RecordWriter {
    protected static final Logger LOG = LoggerFactory.getLogger(DruidRecordWriter.class);
    private final DataSchema dataSchema;
    private final Appenderator appenderator;
    private final RealtimeTuningConfig tuningConfig;
    private final Path segmentsDescriptorDir;
    private SegmentIdentifier currentOpenSegment = null;
    private final int maxPartitionSize;
    private final FileSystem fileSystem;
    private final Supplier<Committer> committerSupplier;
    private final Granularity segmentGranularity;

    public DruidRecordWriter(DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, DataSegmentPusher dataSegmentPusher, int maxPartitionSize, Path segmentsDescriptorsDir, FileSystem fileSystem) {
        File basePersistDir = new File(realtimeTuningConfig.getBasePersistDirectory(), UUID.randomUUID().toString());
        this.tuningConfig = Preconditions.checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir), "realtimeTuningConfig is null");
        this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
        this.appenderator = Appenderators.createOffline(this.dataSchema, this.tuningConfig, new FireDepartmentMetrics(), dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO, DruidStorageHandlerUtils.INDEX_MERGER_V9);
        this.maxPartitionSize = maxPartitionSize;
        this.appenderator.startJob();
        this.segmentsDescriptorDir = Preconditions.checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null");
        this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is null");
        this.segmentGranularity = this.dataSchema.getGranularitySpec().getSegmentGranularity();
        this.committerSupplier = Suppliers.ofInstance(Committers.nil());
    }

    private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) {
        DateTime truncatedDateTime = this.segmentGranularity.bucketStart(DateTimes.utc(truncatedTime));
        Interval interval = new Interval((ReadableInstant)truncatedDateTime, (ReadableInstant)this.segmentGranularity.increment(truncatedDateTime));
        if (this.currentOpenSegment == null) {
            this.currentOpenSegment = new SegmentIdentifier(this.dataSchema.getDataSource(), interval, this.tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(0));
            return this.currentOpenSegment;
        }
        if (this.currentOpenSegment.getInterval().equals((Object)interval)) {
            SegmentIdentifier retVal = this.currentOpenSegment;
            int rowCount = this.appenderator.getRowCount(retVal);
            if (rowCount < this.maxPartitionSize) {
                return retVal;
            }
            retVal = new SegmentIdentifier(this.dataSchema.getDataSource(), interval, this.tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(this.currentOpenSegment.getShardSpec().getPartitionNum() + 1));
            this.pushSegments(Lists.newArrayList(this.currentOpenSegment));
            LOG.info("Creating new partition for segment {}, partition num {}", (Object)retVal.getIdentifierAsString(), (Object)retVal.getShardSpec().getPartitionNum());
            this.currentOpenSegment = retVal;
            return retVal;
        }
        SegmentIdentifier retVal = new SegmentIdentifier(this.dataSchema.getDataSource(), interval, this.tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(0));
        this.pushSegments(Lists.newArrayList(this.currentOpenSegment));
        LOG.info("Creating segment {}", (Object)retVal.getIdentifierAsString());
        this.currentOpenSegment = retVal;
        return retVal;
    }

    private void pushSegments(List<SegmentIdentifier> segmentsToPush) {
        try {
            SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata)this.appenderator.push(segmentsToPush, this.committerSupplier.get()).get();
            HashSet<String> pushedSegmentIdentifierHashSet = new HashSet<String>();
            for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) {
                pushedSegmentIdentifierHashSet.add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString());
                Path segmentDescriptorOutputPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(pushedSegment, this.segmentsDescriptorDir);
                DruidStorageHandlerUtils.writeSegmentDescriptor(this.fileSystem, pushedSegment, segmentDescriptorOutputPath);
                LOG.info(String.format("Pushed the segment [%s] and persisted the descriptor located at [%s]", pushedSegment, segmentDescriptorOutputPath));
            }
            HashSet<String> toPushSegmentsHashSet = new HashSet<String>(FluentIterable.from(segmentsToPush).transform(new Function<SegmentIdentifier, String>(){

                @Override
                @Nullable
                public String apply(@Nullable SegmentIdentifier input) {
                    return input.getIdentifierAsString();
                }
            }).toList());
            if (!pushedSegmentIdentifierHashSet.equals(toPushSegmentsHashSet)) {
                throw new IllegalStateException(String.format("was asked to publish [%s] but was able to publish only [%s]", Joiner.on(", ").join(toPushSegmentsHashSet), Joiner.on(", ").join(pushedSegmentIdentifierHashSet)));
            }
            for (SegmentIdentifier dataSegmentId : segmentsToPush) {
                LOG.info("Dropping segment {}", (Object)dataSegmentId.toString());
                this.appenderator.drop(dataSegmentId).get();
            }
            LOG.info(String.format("Published [%,d] segments.", segmentsToPush.size()));
        }
        catch (InterruptedException e) {
            LOG.error(String.format("got interrupted, failed to push  [%,d] segments.", segmentsToPush.size()), (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (IOException | ExecutionException e) {
            LOG.error(String.format("Failed to push  [%,d] segments.", segmentsToPush.size()), (Throwable)e);
            Throwables.propagate(e);
        }
    }

    public void write(Writable w) throws IOException {
        block8: {
            DruidWritable record = (DruidWritable)w;
            long timestamp = (Long)record.getValue().get("__time");
            int partitionNumber = Math.toIntExact((Long)record.getValue().getOrDefault("__druid_extra_partition_key", -1L));
            MapBasedInputRow inputRow = new MapBasedInputRow(timestamp, this.dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), record.getValue());
            try {
                if (partitionNumber != -1 && this.maxPartitionSize == -1) {
                    DateTime truncatedDateTime = this.segmentGranularity.bucketStart(DateTimes.utc(timestamp));
                    Interval interval = new Interval((ReadableInstant)truncatedDateTime, (ReadableInstant)this.segmentGranularity.increment(truncatedDateTime));
                    if (this.currentOpenSegment != null) {
                        if (this.currentOpenSegment.getShardSpec().getPartitionNum() != partitionNumber || !this.currentOpenSegment.getInterval().equals((Object)interval)) {
                            this.pushSegments(ImmutableList.of(this.currentOpenSegment));
                            this.currentOpenSegment = new SegmentIdentifier(this.dataSchema.getDataSource(), interval, this.tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(partitionNumber));
                        }
                    } else if (this.currentOpenSegment == null) {
                        this.currentOpenSegment = new SegmentIdentifier(this.dataSchema.getDataSource(), interval, this.tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(partitionNumber));
                    }
                    this.appenderator.add(this.currentOpenSegment, inputRow, this.committerSupplier);
                    break block8;
                }
                if (partitionNumber == -1 && this.maxPartitionSize != -1) {
                    this.appenderator.add(this.getSegmentIdentifierAndMaybePush(timestamp), inputRow, this.committerSupplier);
                    break block8;
                }
                throw new IllegalArgumentException(String.format("partitionNumber and  maxPartitionSize should be mutually exclusive got partitionNum [%s] and maxPartitionSize [%s]", partitionNumber, this.maxPartitionSize));
            }
            catch (SegmentNotWritableException e) {
                throw new IOException(e);
            }
        }
    }

    public void close(boolean abort) throws IOException {
        try {
            if (!abort) {
                ArrayList<SegmentIdentifier> segmentsToPush = Lists.newArrayList();
                segmentsToPush.addAll(this.appenderator.getSegments());
                this.pushSegments(segmentsToPush);
            }
            this.appenderator.clear();
        }
        catch (InterruptedException e) {
            Throwables.propagate(e);
        }
        finally {
            try {
                FileUtils.deleteDirectory((File)this.tuningConfig.getBasePersistDirectory());
            }
            catch (Exception e) {
                LOG.error("error cleaning of base persist directory", (Throwable)e);
            }
            this.appenderator.close();
        }
    }

    public void write(NullWritable key, DruidWritable value) throws IOException {
        this.write(value);
    }

    public void close(Reporter reporter) throws IOException {
        this.close(false);
    }
}

