package org.apache.sqoop.framework;

import java.util.Date;
import java.util.Iterator;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.utils.ClassUtils;
import org.json.simple.JSONValue;

/* loaded from: input_file:org/apache/sqoop/framework/JobManager.class */
public class JobManager implements Reconfigurable {
    private static final Logger LOG = Logger.getLogger(JobManager.class);
    private static JobManager instance = new JobManager();
    private static final long DEFAULT_PURGE_THRESHOLD = 86400000;
    private static final long DEFAULT_PURGE_SLEEP = 86400000;
    private static final long DEFAULT_UPDATE_SLEEP = 300000;
    private SubmissionEngine submissionEngine;
    private ExecutionEngine executionEngine;
    private PurgeThread purgeThread = null;
    private UpdateThread updateThread = null;
    private boolean running = true;
    private long purgeThreshold;
    private long purgeSleep;
    private long updateSleep;
    private String notificationBaseUrl;

    /* renamed from: org.apache.sqoop.framework.JobManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/sqoop/framework/JobManager$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$sqoop$model$MJob$Type = new int[MJob.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$sqoop$model$MJob$Type[MJob.Type.IMPORT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$sqoop$model$MJob$Type[MJob.Type.EXPORT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/sqoop/framework/JobManager$PurgeThread.class */
    private class PurgeThread extends Thread {
        public PurgeThread() {
            super("PurgeThread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            JobManager.LOG.info("Starting submission manager purge thread");
            while (JobManager.this.running) {
                try {
                    JobManager.LOG.info("Purging old submissions");
                    RepositoryManager.getInstance().getRepository().purgeSubmissions(new Date(new Date().getTime() - JobManager.this.purgeThreshold));
                    Thread.sleep(JobManager.this.purgeSleep);
                } catch (InterruptedException e) {
                    JobManager.LOG.debug("Purge thread interrupted", e);
                }
            }
            JobManager.LOG.info("Ending submission manager purge thread");
        }
    }

    /* loaded from: input_file:org/apache/sqoop/framework/JobManager$UpdateThread.class */
    private class UpdateThread extends Thread {
        public UpdateThread() {
            super("UpdateThread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            JobManager.LOG.info("Starting submission manager update thread");
            while (JobManager.this.running) {
                try {
                    JobManager.LOG.debug("Updating running submissions");
                    Iterator<MSubmission> it = RepositoryManager.getInstance().getRepository().findSubmissionsUnfinished().iterator();
                    while (it.hasNext()) {
                        JobManager.this.update(it.next());
                    }
                    Thread.sleep(JobManager.this.updateSleep);
                } catch (InterruptedException e) {
                    JobManager.LOG.debug("Purge thread interrupted", e);
                }
            }
            JobManager.LOG.info("Ending submission manager update thread");
        }
    }

    public static JobManager getInstance() {
        return instance;
    }

    public static void setInstance(JobManager jobManager) {
        instance = jobManager;
    }

    public void setNotificationBaseUrl(String str) {
        LOG.debug("Setting notification base URL to " + str);
        this.notificationBaseUrl = str;
    }

    public String getNotificationBaseUrl() {
        return this.notificationBaseUrl;
    }

    public synchronized void destroy() {
        LOG.trace("Begin submission engine manager destroy");
        this.running = false;
        try {
            this.purgeThread.interrupt();
            this.purgeThread.join();
        } catch (InterruptedException e) {
            LOG.error("Interrupted joining purgeThread");
        }
        try {
            this.updateThread.interrupt();
            this.updateThread.join();
        } catch (InterruptedException e2) {
            LOG.error("Interrupted joining updateThread");
        }
        if (this.submissionEngine != null) {
            this.submissionEngine.destroy();
        }
        if (this.executionEngine != null) {
            this.executionEngine.destroy();
        }
    }

    public synchronized void initialize() {
        LOG.trace("Begin submission engine manager initialization");
        ImmutableContext context = SqoopConfiguration.getInstance().getContext();
        String string = context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
        this.submissionEngine = (SubmissionEngine) ClassUtils.instantiate(string, new Object[0]);
        if (this.submissionEngine == null) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0001, string);
        }
        this.submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
        String string2 = context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
        this.executionEngine = (ExecutionEngine) ClassUtils.instantiate(string2, new Object[0]);
        if (this.executionEngine == null) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0007, string2);
        }
        if (!this.submissionEngine.isExecutionEngineSupported(this.executionEngine.getClass())) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0008);
        }
        this.executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
        this.purgeThreshold = context.getLong(FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, 86400000L);
        this.purgeSleep = context.getLong(FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, 86400000L);
        this.purgeThread = new PurgeThread();
        this.purgeThread.start();
        this.updateSleep = context.getLong(FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, DEFAULT_UPDATE_SLEEP);
        this.updateThread = new UpdateThread();
        this.updateThread.start();
        SqoopConfiguration.getInstance().getProvider().registerListener(new SqoopConfiguration.CoreConfigurationListener(this));
        LOG.info("Submission manager initialized: OK");
    }

    public MSubmission submit(long j, HttpEventContext httpEventContext) {
        String username = httpEventContext.getUsername();
        Repository repository = RepositoryManager.getInstance().getRepository();
        MJob findJob = repository.findJob(j);
        if (findJob == null) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id " + j);
        }
        if (!findJob.getEnabled()) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: " + findJob.getPersistenceId());
        }
        MConnection findConnection = repository.findConnection(findJob.getConnectionId());
        if (!findConnection.getEnabled()) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: " + findConnection.getPersistenceId());
        }
        SqoopConnector connector = ConnectorManager.getInstance().getConnector(findJob.getConnectorId());
        Object instantiate = ClassUtils.instantiate(connector.getConnectionConfigurationClass(), new Object[0]);
        FormUtils.fromForms(findConnection.getConnectorPart().getForms(), instantiate);
        Object instantiate2 = ClassUtils.instantiate(connector.getJobConfigurationClass(findJob.getType()), new Object[0]);
        FormUtils.fromForms(findJob.getConnectorPart().getForms(), instantiate2);
        Object instantiate3 = ClassUtils.instantiate(FrameworkManager.getInstance().getConnectionConfigurationClass(), new Object[0]);
        FormUtils.fromForms(findConnection.getFrameworkPart().getForms(), instantiate3);
        Object instantiate4 = ClassUtils.instantiate(FrameworkManager.getInstance().getJobConfigurationClass(findJob.getType()), new Object[0]);
        FormUtils.fromForms(findJob.getFrameworkPart().getForms(), instantiate4);
        MSubmission mSubmission = new MSubmission(j);
        SubmissionRequest createSubmissionRequest = this.executionEngine.createSubmissionRequest();
        mSubmission.setCreationUser(username);
        mSubmission.setLastUpdateUser(username);
        createSubmissionRequest.setSummary(mSubmission);
        createSubmissionRequest.setConnector(connector);
        createSubmissionRequest.setConfigConnectorConnection(instantiate);
        createSubmissionRequest.setConfigConnectorJob(instantiate2);
        createSubmissionRequest.setConfigFrameworkConnection(instantiate3);
        createSubmissionRequest.setConfigFrameworkJob(instantiate4);
        createSubmissionRequest.setJobType(findJob.getType());
        createSubmissionRequest.setJobName(findJob.getName());
        createSubmissionRequest.setJobId(findJob.getPersistenceId());
        createSubmissionRequest.setNotificationUrl(this.notificationBaseUrl + j);
        createSubmissionRequest.addJarForClass(MapContext.class);
        createSubmissionRequest.addJarForClass(FrameworkManager.class);
        createSubmissionRequest.addJarForClass(SqoopConnector.class);
        createSubmissionRequest.addJarForClass(this.executionEngine.getClass());
        createSubmissionRequest.addJarForClass(connector.getClass());
        createSubmissionRequest.addJarForClass(JSONValue.class);
        switch (AnonymousClass1.$SwitchMap$org$apache$sqoop$model$MJob$Type[findJob.getType().ordinal()]) {
            case 1:
                createSubmissionRequest.setConnectorCallbacks(connector.getImporter());
                break;
            case 2:
                createSubmissionRequest.setConnectorCallbacks(connector.getExporter());
                break;
            default:
                throw new SqoopException(FrameworkError.FRAMEWORK_0005, "Unsupported job type " + findJob.getType().name());
        }
        LOG.debug("Using callbacks: " + createSubmissionRequest.getConnectorCallbacks());
        Class initializer = createSubmissionRequest.getConnectorCallbacks().getInitializer();
        Initializer initializer2 = (Initializer) ClassUtils.instantiate(initializer, new Object[0]);
        if (initializer2 == null) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0006, "Can't create initializer instance: " + initializer.getName());
        }
        InitializerContext initializerContext = new InitializerContext(createSubmissionRequest.getConnectorContext());
        initializer2.initialize(initializerContext, createSubmissionRequest.getConfigConnectorConnection(), createSubmissionRequest.getConfigConnectorJob());
        createSubmissionRequest.addJars(initializer2.getJars(initializerContext, createSubmissionRequest.getConfigConnectorConnection(), createSubmissionRequest.getConfigConnectorJob()));
        createSubmissionRequest.getSummary().setConnectorSchema(initializer2.getSchema(initializerContext, createSubmissionRequest.getConfigConnectorConnection(), createSubmissionRequest.getConfigConnectorJob()));
        switch (AnonymousClass1.$SwitchMap$org$apache$sqoop$model$MJob$Type[findJob.getType().ordinal()]) {
            case 1:
                prepareImportSubmission(createSubmissionRequest);
                break;
            case 2:
                prepareExportSubmission(createSubmissionRequest);
                break;
            default:
                throw new SqoopException(FrameworkError.FRAMEWORK_0005, "Unsupported job type " + findJob.getType().name());
        }
        synchronized (getClass()) {
            MSubmission findSubmissionLastForJob = repository.findSubmissionLastForJob(j);
            if (findSubmissionLastForJob != null && findSubmissionLastForJob.getStatus().isRunning()) {
                throw new SqoopException(FrameworkError.FRAMEWORK_0002, "Job with id " + j);
            }
            if (!this.submissionEngine.submit(createSubmissionRequest)) {
                destroySubmission(createSubmissionRequest);
                mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
            }
            repository.createSubmission(mSubmission);
        }
        return mSubmission;
    }

    private void prepareImportSubmission(SubmissionRequest submissionRequest) {
        ImportJobConfiguration importJobConfiguration = (ImportJobConfiguration) submissionRequest.getConfigFrameworkJob();
        submissionRequest.setOutputDirectory(importJobConfiguration.output.outputDirectory);
        submissionRequest.setExtractors(importJobConfiguration.throttling.extractors);
        submissionRequest.setLoaders(importJobConfiguration.throttling.loaders);
        this.executionEngine.prepareImportSubmission(submissionRequest);
    }

    private void prepareExportSubmission(SubmissionRequest submissionRequest) {
        ExportJobConfiguration exportJobConfiguration = (ExportJobConfiguration) submissionRequest.getConfigFrameworkJob();
        submissionRequest.setExtractors(exportJobConfiguration.throttling.extractors);
        submissionRequest.setLoaders(exportJobConfiguration.throttling.loaders);
        this.executionEngine.prepareExportSubmission(submissionRequest);
    }

    private void destroySubmission(SubmissionRequest submissionRequest) {
        Class destroyer = submissionRequest.getConnectorCallbacks().getDestroyer();
        Destroyer destroyer2 = (Destroyer) ClassUtils.instantiate(destroyer, new Object[0]);
        if (destroyer2 == null) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0006, "Can't create destroyer instance: " + destroyer.getName());
        }
        destroyer2.destroy(new DestroyerContext(submissionRequest.getConnectorContext(), false, submissionRequest.getSummary().getConnectorSchema()), submissionRequest.getConfigConnectorConnection(), submissionRequest.getConfigConnectorJob());
    }

    public MSubmission stop(long j, HttpEventContext httpEventContext) {
        String username = httpEventContext.getUsername();
        MSubmission findSubmissionLastForJob = RepositoryManager.getInstance().getRepository().findSubmissionLastForJob(j);
        if (findSubmissionLastForJob == null || !findSubmissionLastForJob.getStatus().isRunning()) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0003, "Job with id " + j + " is not running");
        }
        this.submissionEngine.stop(findSubmissionLastForJob.getExternalId());
        findSubmissionLastForJob.setLastUpdateUser(username);
        update(findSubmissionLastForJob);
        return findSubmissionLastForJob;
    }

    public MSubmission status(long j) {
        MSubmission findSubmissionLastForJob = RepositoryManager.getInstance().getRepository().findSubmissionLastForJob(j);
        if (findSubmissionLastForJob == null) {
            return new MSubmission(j, new Date(), SubmissionStatus.NEVER_EXECUTED);
        }
        if (findSubmissionLastForJob.getStatus().isRunning()) {
            update(findSubmissionLastForJob);
        }
        return findSubmissionLastForJob;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void update(MSubmission mSubmission) {
        double d = -1.0d;
        Counters counters = null;
        String externalId = mSubmission.getExternalId();
        SubmissionStatus status = this.submissionEngine.status(externalId);
        String externalLink = this.submissionEngine.externalLink(externalId);
        if (status.isRunning()) {
            d = this.submissionEngine.progress(externalId);
        } else {
            counters = this.submissionEngine.counters(externalId);
        }
        mSubmission.setStatus(status);
        mSubmission.setProgress(d);
        mSubmission.setCounters(counters);
        mSubmission.setExternalLink(externalLink);
        mSubmission.setLastUpdateDate(new Date());
        RepositoryManager.getInstance().getRepository().updateSubmission(mSubmission);
    }

    @Override // org.apache.sqoop.core.Reconfigurable
    public synchronized void configurationChanged() {
        LOG.info("Begin submission engine manager reconfiguring");
        MapContext context = SqoopConfiguration.getInstance().getContext();
        MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
        String string = context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
        if (string == null || string.trim().length() == 0) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0001, string);
        }
        if (!string.equals(oldContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE))) {
            LOG.warn("Submission engine cannot be replaced at the runtime. You might need to restart the server.");
        }
        String string2 = context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
        if (string2 == null || string2.trim().length() == 0) {
            throw new SqoopException(FrameworkError.FRAMEWORK_0007, string2);
        }
        if (!string2.equals(oldContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE))) {
            LOG.warn("Execution engine cannot be replaced at the runtime. You might need to restart the server.");
        }
        this.purgeThreshold = context.getLong(FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, 86400000L);
        this.purgeSleep = context.getLong(FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, 86400000L);
        this.purgeThread.interrupt();
        this.updateSleep = context.getLong(FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, DEFAULT_UPDATE_SLEEP);
        this.updateThread.interrupt();
        LOG.info("Submission engine manager reconfigured.");
    }
}
