package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;

import com.microsoft.azure.cosmosdb.AccessCondition;
import com.microsoft.azure.cosmosdb.AccessConditionType;
import com.microsoft.azure.cosmosdb.Database;
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.RequestOptions;
import com.microsoft.azure.cosmosdb.ResourceResponse;
import com.microsoft.azure.cosmosdb.SqlParameter;
import com.microsoft.azure.cosmosdb.SqlParameterCollection;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

/* loaded from: input_file:org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.class */
public class CosmosDBDocumentStoreWriter<TimelineDoc extends TimelineDocument> implements DocumentStoreWriter<TimelineDoc> {
    private final String databaseName;
    private static AsyncDocumentClient client;
    private ExecutorService executorService = Executors.newFixedThreadPool(CollectionType.values().length);
    private Scheduler schedulerForBlockingWork = Schedulers.from(this.executorService);
    private static final String DATABASE_LINK = "/dbs/%s";
    private static final String COLLECTION_LINK = "/dbs/%s/colls/%s";
    private static final String DOCUMENT_LINK = "/dbs/%s/colls/%s/docs/%s";
    private static final String ID = "@id";
    private static final String QUERY_COLLECTION_IF_EXISTS = "SELECT * FROM r where r.id = @id";
    private static final Logger LOG = LoggerFactory.getLogger(CosmosDBDocumentStoreWriter.class);
    private static final PerNodeAggTimelineCollectorMetrics METRICS = PerNodeAggTimelineCollectorMetrics.getInstance();

    public CosmosDBDocumentStoreWriter(Configuration configuration) {
        LOG.info("Initializing Cosmos DB DocumentStoreWriter...");
        this.databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(configuration);
        initCosmosDBClient(configuration);
    }

    private synchronized void initCosmosDBClient(Configuration configuration) {
        if (client == null) {
            LOG.info("Creating Cosmos DB Writer Async Client...");
            client = DocumentStoreUtils.createCosmosDBAsyncClient(configuration);
            addShutdownHook();
        }
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter
    public void createDatabase() {
        client.readDatabase(String.format(DATABASE_LINK, this.databaseName), (RequestOptions) null).doOnNext(resourceResponse -> {
            LOG.info("Database {} already exists.", this.databaseName);
        }).onErrorResumeNext(th -> {
            if (!(th instanceof DocumentClientException) || ((DocumentClientException) th).getStatusCode() != 404) {
                LOG.error("Reading database : {} if it exists failed.", this.databaseName, th);
                return Observable.error(th);
            }
            LOG.info("Creating new Database : {}", this.databaseName);
            Database database = new Database();
            database.setId(this.databaseName);
            return client.createDatabase(database, (RequestOptions) null);
        }).toCompletable().await();
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter
    public void createCollection(String str) {
        LOG.info("Creating Timeline Collection : {} for Database : {}", str, this.databaseName);
        client.queryCollections(String.format(DATABASE_LINK, this.databaseName), new SqlQuerySpec(QUERY_COLLECTION_IF_EXISTS, new SqlParameterCollection(new SqlParameter[]{new SqlParameter(ID, str)})), (FeedOptions) null).single().flatMap(feedResponse -> {
            if (!feedResponse.getResults().isEmpty()) {
                LOG.info("Collection {} already exists.", str);
                return Observable.empty();
            }
            DocumentCollection documentCollection = new DocumentCollection();
            documentCollection.setId(str);
            LOG.info("Creating collection {}", str);
            return client.createCollection(String.format(DATABASE_LINK, this.databaseName), documentCollection, (RequestOptions) null);
        }).doOnError(th -> {
            LOG.error("Unable to create collection : {}", str, th);
        }).toCompletable().await();
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter
    public void writeDocument(TimelineDoc timelinedoc, CollectionType collectionType) {
        LOG.debug("Upserting document under collection : {} with  entity type : {} under Database {}", new Object[]{this.databaseName, timelinedoc.getType(), collectionType.getCollectionName()});
        boolean z = false;
        long monotonicNow = Time.monotonicNow();
        try {
            try {
                upsertDocument(collectionType, timelinedoc);
                z = true;
                METRICS.addPutEntitiesLatency(Time.monotonicNow() - monotonicNow, true);
            } catch (Exception e) {
                LOG.error("Unable to perform upsert for Document Id : {} under Collection : {} under Database {}", new Object[]{timelinedoc.getId(), collectionType.getCollectionName(), this.databaseName, e});
                METRICS.addPutEntitiesLatency(Time.monotonicNow() - monotonicNow, z);
            }
        } catch (Throwable th) {
            METRICS.addPutEntitiesLatency(Time.monotonicNow() - monotonicNow, z);
            throw th;
        }
    }

    private void upsertDocument(CollectionType collectionType, TimelineDoc timelinedoc) {
        String format = String.format(COLLECTION_LINK, this.databaseName, collectionType.getCollectionName());
        RequestOptions requestOptions = new RequestOptions();
        AccessCondition accessCondition = new AccessCondition();
        StringBuilder sb = new StringBuilder();
        TimelineDoc applyUpdatesOnPrevDoc = applyUpdatesOnPrevDoc(collectionType, timelinedoc, sb);
        accessCondition.setCondition(sb.toString());
        accessCondition.setType(AccessConditionType.IfMatch);
        requestOptions.setAccessCondition(accessCondition);
        ResourceResponse resourceResponse = (ResourceResponse) client.upsertDocument(format, applyUpdatesOnPrevDoc, requestOptions, true).subscribeOn(this.schedulerForBlockingWork).doOnError(th -> {
            LOG.error("Error while upserting Collection : {} with Doc Id : {} under Database : {}", new Object[]{collectionType.getCollectionName(), applyUpdatesOnPrevDoc.getId(), this.databaseName, th});
        }).toBlocking().single();
        if (resourceResponse.getStatusCode() == 409) {
            LOG.warn("There was a conflict while upserting, hence retrying...", resourceResponse);
            upsertDocument(collectionType, applyUpdatesOnPrevDoc);
        } else {
            if (resourceResponse.getStatusCode() < 200 || resourceResponse.getStatusCode() >= 300) {
                return;
            }
            LOG.debug("Successfully wrote doc with id : {} and type : {} under Database : {}", new Object[]{timelinedoc.getId(), timelinedoc.getType(), this.databaseName});
        }
    }

    @VisibleForTesting
    TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType, TimelineDoc timelinedoc, StringBuilder sb) {
        TimelineDoc fetchLatestDoc = fetchLatestDoc(collectionType, timelinedoc.getId(), sb);
        if (fetchLatestDoc != null) {
            fetchLatestDoc.merge(timelinedoc);
            timelinedoc = fetchLatestDoc;
        }
        return timelinedoc;
    }

    @VisibleForTesting
    TimelineDoc fetchLatestDoc(CollectionType collectionType, String str, StringBuilder sb) {
        TimelineDocument timelineDocument;
        try {
            Document resource = ((ResourceResponse) client.readDocument(String.format(DOCUMENT_LINK, this.databaseName, collectionType.getCollectionName(), str), new RequestOptions()).toBlocking().single()).getResource();
            switch (collectionType) {
                case FLOW_RUN:
                    timelineDocument = (TimelineDocument) resource.toObject(FlowRunDocument.class);
                    break;
                case FLOW_ACTIVITY:
                    timelineDocument = (TimelineDocument) resource.toObject(FlowActivityDocument.class);
                    break;
                default:
                    timelineDocument = (TimelineDocument) resource.toObject(TimelineEntityDocument.class);
                    break;
            }
            sb.append(resource.getETag());
            return (TimelineDoc) timelineDocument;
        } catch (Exception e) {
            LOG.debug("No previous Document found with id : {} for Collection : {} under Database : {}", new Object[]{str, collectionType.getCollectionName(), this.databaseName});
            return null;
        }
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (client != null) {
            LOG.info("Closing Cosmos DB Writer Async Client...");
            client.close();
            client = null;
        }
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.executorService != null) {
                this.executorService.shutdown();
            }
        }));
    }
}
