/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;

import com.microsoft.azure.cosmosdb.AccessCondition;
import com.microsoft.azure.cosmosdb.AccessConditionType;
import com.microsoft.azure.cosmosdb.Database;
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.RequestOptions;
import com.microsoft.azure.cosmosdb.ResourceResponse;
import com.microsoft.azure.cosmosdb.SqlParameter;
import com.microsoft.azure.cosmosdb.SqlParameterCollection;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

public class CosmosDBDocumentStoreWriter<TimelineDoc extends TimelineDocument>
implements DocumentStoreWriter<TimelineDoc> {
    private static final Logger LOG = LoggerFactory.getLogger(CosmosDBDocumentStoreWriter.class);
    private final String databaseName;
    private static final PerNodeAggTimelineCollectorMetrics METRICS = PerNodeAggTimelineCollectorMetrics.getInstance();
    private static AsyncDocumentClient client;
    private ExecutorService executorService = Executors.newFixedThreadPool(CollectionType.values().length);
    private Scheduler schedulerForBlockingWork = Schedulers.from((Executor)this.executorService);
    private static final String DATABASE_LINK = "/dbs/%s";
    private static final String COLLECTION_LINK = "/dbs/%s/colls/%s";
    private static final String DOCUMENT_LINK = "/dbs/%s/colls/%s/docs/%s";
    private static final String ID = "@id";
    private static final String QUERY_COLLECTION_IF_EXISTS = "SELECT * FROM r where r.id = @id";

    public CosmosDBDocumentStoreWriter(Configuration conf) {
        LOG.info("Initializing Cosmos DB DocumentStoreWriter...");
        this.databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
        this.initCosmosDBClient(conf);
    }

    private synchronized void initCosmosDBClient(Configuration conf) {
        if (client == null) {
            LOG.info("Creating Cosmos DB Writer Async Client...");
            client = DocumentStoreUtils.createCosmosDBAsyncClient(conf);
            this.addShutdownHook();
        }
    }

    @Override
    public void createDatabase() {
        Observable databaseReadObs = client.readDatabase(String.format(DATABASE_LINK, this.databaseName), null);
        Observable databaseExistenceObs = databaseReadObs.doOnNext(databaseResourceResponse -> LOG.info("Database {} already exists.", (Object)this.databaseName)).onErrorResumeNext(throwable -> {
            DocumentClientException de;
            if (throwable instanceof DocumentClientException && (de = (DocumentClientException)throwable).getStatusCode() == 404) {
                LOG.info("Creating new Database : {}", (Object)this.databaseName);
                Database dbDefinition = new Database();
                dbDefinition.setId(this.databaseName);
                return client.createDatabase(dbDefinition, null);
            }
            LOG.error("Reading database : {} if it exists failed.", (Object)this.databaseName, throwable);
            return Observable.error((Throwable)throwable);
        });
        databaseExistenceObs.toCompletable().await();
    }

    @Override
    public void createCollection(String collectionName) {
        LOG.info("Creating Timeline Collection : {} for Database : {}", (Object)collectionName, (Object)this.databaseName);
        client.queryCollections(String.format(DATABASE_LINK, this.databaseName), new SqlQuerySpec(QUERY_COLLECTION_IF_EXISTS, new SqlParameterCollection(new SqlParameter[]{new SqlParameter(ID, (Object)collectionName)})), null).single().flatMap(page -> {
            if (page.getResults().isEmpty()) {
                DocumentCollection collection = new DocumentCollection();
                collection.setId(collectionName);
                LOG.info("Creating collection {}", (Object)collectionName);
                return client.createCollection(String.format(DATABASE_LINK, this.databaseName), collection, null);
            }
            LOG.info("Collection {} already exists.", (Object)collectionName);
            return Observable.empty();
        }).doOnError(throwable -> LOG.error("Unable to create collection : {}", (Object)collectionName, throwable)).toCompletable().await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void writeDocument(TimelineDoc timelineDoc, CollectionType collectionType) {
        LOG.debug("Upserting document under collection : {} with  entity type : {} under Database {}", new Object[]{this.databaseName, timelineDoc.getType(), collectionType.getCollectionName()});
        boolean succeeded = false;
        long startTime = Time.monotonicNow();
        try {
            this.upsertDocument(collectionType, timelineDoc);
            succeeded = true;
        }
        catch (Exception e) {
            LOG.error("Unable to perform upsert for Document Id : {} under Collection : {} under Database {}", new Object[]{timelineDoc.getId(), collectionType.getCollectionName(), this.databaseName, e});
        }
        finally {
            long latency = Time.monotonicNow() - startTime;
            METRICS.addPutEntitiesLatency(latency, succeeded);
        }
    }

    private void upsertDocument(CollectionType collectionType, TimelineDoc timelineDoc) {
        String collectionLink = String.format(COLLECTION_LINK, this.databaseName, collectionType.getCollectionName());
        RequestOptions requestOptions = new RequestOptions();
        AccessCondition accessCondition = new AccessCondition();
        StringBuilder eTagStrBuilder = new StringBuilder();
        TimelineDoc updatedTimelineDoc = this.applyUpdatesOnPrevDoc(collectionType, timelineDoc, eTagStrBuilder);
        accessCondition.setCondition(eTagStrBuilder.toString());
        accessCondition.setType(AccessConditionType.IfMatch);
        requestOptions.setAccessCondition(accessCondition);
        ResourceResponse resourceResponse = (ResourceResponse)client.upsertDocument(collectionLink, updatedTimelineDoc, requestOptions, true).subscribeOn(this.schedulerForBlockingWork).doOnError(throwable -> LOG.error("Error while upserting Collection : {} with Doc Id : {} under Database : {}", new Object[]{collectionType.getCollectionName(), updatedTimelineDoc.getId(), this.databaseName, throwable})).toBlocking().single();
        if (resourceResponse.getStatusCode() == 409) {
            LOG.warn("There was a conflict while upserting, hence retrying...", (Object)resourceResponse);
            this.upsertDocument(collectionType, updatedTimelineDoc);
        } else if (resourceResponse.getStatusCode() >= 200 && resourceResponse.getStatusCode() < 300) {
            LOG.debug("Successfully wrote doc with id : {} and type : {} under Database : {}", new Object[]{timelineDoc.getId(), timelineDoc.getType(), this.databaseName});
        }
    }

    @VisibleForTesting
    TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType, TimelineDoc timelineDoc, StringBuilder eTagStrBuilder) {
        TimelineDoc prevDocument = this.fetchLatestDoc(collectionType, timelineDoc.getId(), eTagStrBuilder);
        if (prevDocument != null) {
            prevDocument.merge(timelineDoc);
            timelineDoc = prevDocument;
        }
        return timelineDoc;
    }

    @VisibleForTesting
    TimelineDoc fetchLatestDoc(CollectionType collectionType, String documentId, StringBuilder eTagStrBuilder) {
        String documentLink = String.format(DOCUMENT_LINK, this.databaseName, collectionType.getCollectionName(), documentId);
        try {
            TimelineDocument timelineDoc;
            Document latestDocument = (Document)((ResourceResponse)client.readDocument(documentLink, new RequestOptions()).toBlocking().single()).getResource();
            switch (collectionType) {
                case FLOW_RUN: {
                    timelineDoc = (TimelineDocument)latestDocument.toObject(FlowRunDocument.class);
                    break;
                }
                case FLOW_ACTIVITY: {
                    timelineDoc = (TimelineDocument)latestDocument.toObject(FlowActivityDocument.class);
                    break;
                }
                default: {
                    timelineDoc = (TimelineDocument)latestDocument.toObject(TimelineEntityDocument.class);
                }
            }
            eTagStrBuilder.append(latestDocument.getETag());
            return (TimelineDoc)timelineDoc;
        }
        catch (Exception e) {
            LOG.debug("No previous Document found with id : {} for Collection : {} under Database : {}", new Object[]{documentId, collectionType.getCollectionName(), this.databaseName});
            return null;
        }
    }

    @Override
    public synchronized void close() {
        if (client != null) {
            LOG.info("Closing Cosmos DB Writer Async Client...");
            client.close();
            client = null;
        }
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.executorService != null) {
                this.executorService.shutdown();
            }
        }));
    }
}

