package org.apache.hadoop.mapreduce.lib.output.committer.manifest;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TextOutputForTests;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.class */
public class TestManifestCommitProtocol extends AbstractManifestCommitterTest {
    private static final String SUB_DIR = "SUB_DIR";
    protected static final String PART_00000 = "part-m-00000";
    private final String jobId;
    private final String attempt0;
    private final TaskAttemptID taskAttempt0;
    private final TaskAttemptID taskAttempt1;
    private final String attempt1;
    private Path outputDir;
    private static final Logger LOG = LoggerFactory.getLogger(TestManifestCommitProtocol.class);
    private static final Text KEY_1 = new Text("key1");
    private static final Text KEY_2 = new Text("key2");
    private static final Text VAL_1 = new Text("val1");
    private static final Text VAL_2 = new Text("val2");
    private static final IOStatisticsSnapshot IOSTATISTICS = IOStatisticsSupport.snapshotIOStatistics();
    public static final PathFilter HIDDEN_FILE_FILTER = path -> {
        return (path.getName().startsWith("_") || path.getName().startsWith(".")) ? false : true;
    };
    private final List<JobData> abortInTeardown = new ArrayList(1);
    private final LocalCommitterFactory localCommitterFactory = new LocalCommitterFactory();

    @FunctionalInterface
    /* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol$ActionToTest.class */
    public interface ActionToTest {
        void exec(Job job, JobContext jobContext, TaskAttemptContext taskAttemptContext, ManifestCommitter manifestCommitter) throws Exception;
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol$CommitterFactory.class */
    public interface CommitterFactory {
        ManifestCommitter createCommitter(TaskAttemptContext taskAttemptContext) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol$JobData.class */
    public static final class JobData {
        private final Job job;
        private final JobContext jContext;
        private final TaskAttemptContext tContext;
        private final ManifestCommitter committer;
        private final Configuration conf;
        private Path writtenTextPath;

        public JobData(Job job, JobContext jobContext, TaskAttemptContext taskAttemptContext, ManifestCommitter manifestCommitter) {
            this.job = job;
            this.jContext = jobContext;
            this.tContext = taskAttemptContext;
            this.committer = manifestCommitter;
            this.conf = job.getConfiguration();
        }

        public String jobId() {
            return this.committer.getJobUniqueId();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol$LocalCommitterFactory.class */
    public class LocalCommitterFactory implements CommitterFactory {
        protected LocalCommitterFactory() {
        }

        @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestManifestCommitProtocol.CommitterFactory
        public ManifestCommitter createCommitter(TaskAttemptContext taskAttemptContext) throws IOException {
            return TestManifestCommitProtocol.this.createCommitter(taskAttemptContext);
        }
    }

    private void cleanupOutputDir() throws IOException {
        if (this.outputDir != null) {
            getFileSystem().delete(this.outputDir, true);
        }
    }

    public TestManifestCommitProtocol() {
        ManifestCommitterTestSupport.JobAndTaskIDsForTests jobAndTaskIDsForTests = new ManifestCommitterTestSupport.JobAndTaskIDsForTests(2, 2);
        this.jobId = jobAndTaskIDsForTests.getJobId();
        this.attempt0 = jobAndTaskIDsForTests.getTaskAttempt(0, 0);
        this.taskAttempt0 = jobAndTaskIDsForTests.getTaskAttemptIdType(0, 0);
        this.attempt1 = jobAndTaskIDsForTests.getTaskAttempt(0, 1);
        this.taskAttempt1 = jobAndTaskIDsForTests.getTaskAttemptIdType(0, 1);
    }

    protected String suitename() {
        return "TestManifestCommitProtocolLocalFS";
    }

    public Logger log() {
        return LOG;
    }

    protected String getMethodName() {
        return suitename() + "-" + super.getMethodName();
    }

    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest
    public void setup() throws Exception {
        super.setup();
        this.outputDir = path(getMethodName());
        cleanupOutputDir();
    }

    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest
    public void teardown() throws Exception {
        describe("teardown");
        Thread.currentThread().setName("teardown");
        for (JobData jobData : this.abortInTeardown) {
            abortJobQuietly(jobData);
            IOSTATISTICS.aggregate(jobData.committer.getIOStatistics());
        }
        try {
            cleanupOutputDir();
        } catch (IOException e) {
            log().info("Exception during cleanup", e);
        }
        super.teardown();
    }

    @AfterClass
    public static void logAggregateIOStatistics() {
        LOG.info("Final IOStatistics {}", IOStatisticsLogging.ioStatisticsToPrettyString(IOSTATISTICS));
    }

    protected void abortInTeardown(JobData jobData) {
        this.abortInTeardown.add(jobData);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest
    public Configuration createConfiguration() {
        Configuration createConfiguration = super.createConfiguration();
        bindCommitter(createConfiguration);
        return createConfiguration;
    }

    protected void bindCommitter(Configuration configuration) {
        configuration.set("mapreduce.outputcommitter.factory.class", ManifestCommitterConstants.MANIFEST_COMMITTER_FACTORY);
    }

    protected ManifestCommitter createCommitter(TaskAttemptContext taskAttemptContext) throws IOException {
        return createCommitter(getOutputDir(), taskAttemptContext);
    }

    protected ManifestCommitter createCommitter(Path path, TaskAttemptContext taskAttemptContext) throws IOException {
        return new ManifestCommitter(path, taskAttemptContext);
    }

    protected Path getOutputDir() {
        return this.outputDir;
    }

    protected String getJobId() {
        return this.jobId;
    }

    protected String getAttempt0() {
        return this.attempt0;
    }

    protected TaskAttemptID getTaskAttempt0() {
        return this.taskAttempt0;
    }

    protected String getAttempt1() {
        return this.attempt1;
    }

    protected TaskAttemptID getTaskAttempt1() {
        return this.taskAttempt1;
    }

    protected void assertCommitterFactoryIsManifestCommitter(JobContext jobContext, Path path) {
        Configuration configuration = jobContext.getConfiguration();
        assertConfigurationUsesManifestCommitter(configuration);
        Assertions.assertThat(PathOutputCommitterFactory.getCommitterFactory(path, configuration)).describedAs("Committer for output path %s and factory name \"%s\"", new Object[]{path, configuration.get("mapreduce.outputcommitter.factory.class", "")}).isInstanceOf(ManifestCommitterFactory.class);
    }

    private void assertConfigurationUsesManifestCommitter(Configuration configuration) {
        Assertions.assertThat(configuration.get("mapreduce.outputcommitter.factory.class", (String) null)).describedAs("Value of %s", new Object[]{"mapreduce.outputcommitter.factory.class"}).isEqualTo(ManifestCommitterConstants.MANIFEST_COMMITTER_FACTORY);
    }

    protected Path writeTextOutput(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        describe("write output");
        DurationInfo durationInfo = new DurationInfo(LOG, "Writing Text output for task %s", new Object[]{taskAttemptContext.getTaskAttemptID()});
        try {
            TextOutputForTests.LoggingLineRecordWriter m39getRecordWriter = new TextOutputForTests().m39getRecordWriter(taskAttemptContext);
            writeOutput(m39getRecordWriter, taskAttemptContext);
            Path dest = m39getRecordWriter.getDest();
            durationInfo.close();
            return dest;
        } catch (Throwable th) {
            try {
                durationInfo.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void writeOutput(RecordWriter<Writable, Object> recordWriter, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        NullWritable nullWritable = NullWritable.get();
        ManifestCommitterTestSupport.CloseWriter closeWriter = new ManifestCommitterTestSupport.CloseWriter(recordWriter, taskAttemptContext);
        try {
            recordWriter.write(KEY_1, VAL_1);
            recordWriter.write((Object) null, nullWritable);
            recordWriter.write((Object) null, VAL_1);
            recordWriter.write(nullWritable, VAL_2);
            recordWriter.write(KEY_2, nullWritable);
            recordWriter.write(KEY_1, (Object) null);
            recordWriter.write((Object) null, (Object) null);
            recordWriter.write(KEY_2, VAL_2);
            recordWriter.close(taskAttemptContext);
            closeWriter.close();
        } catch (Throwable th) {
            try {
                closeWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void writeMapFileOutput(RecordWriter<WritableComparable<?>, Writable> recordWriter, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        describe("\nWrite map output");
        DurationInfo durationInfo = new DurationInfo(LOG, "Writing Text output for task %s", new Object[]{taskAttemptContext.getTaskAttemptID()});
        try {
            ManifestCommitterTestSupport.CloseWriter closeWriter = new ManifestCommitterTestSupport.CloseWriter(recordWriter, taskAttemptContext);
            for (int i = 0; i < 10; i++) {
                try {
                    recordWriter.write(new LongWritable(i), (i & 1) == 1 ? VAL_1 : VAL_2);
                } finally {
                }
            }
            LOG.debug("Closing writer {}", recordWriter);
            recordWriter.close(taskAttemptContext);
            closeWriter.close();
            durationInfo.close();
        } catch (Throwable th) {
            try {
                durationInfo.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public Job newJob() throws IOException {
        return newJob(this.outputDir, getConfiguration(), this.attempt0);
    }

    private Job newJob(Path path, Configuration configuration, String str) throws IOException {
        Job job = Job.getInstance(configuration);
        Configuration configuration2 = job.getConfiguration();
        configuration2.set("mapreduce.task.attempt.id", str);
        enableManifestCommitter(configuration2);
        FileOutputFormat.setOutputPath(job, path);
        return job;
    }

    protected JobData startJob(boolean z) throws IOException, InterruptedException {
        return startJob(this.localCommitterFactory, z);
    }

    protected JobData startJob(CommitterFactory committerFactory, boolean z) throws IOException, InterruptedException {
        Job newJob = newJob();
        Configuration configuration = newJob.getConfiguration();
        assertConfigurationUsesManifestCommitter(configuration);
        configuration.set("mapreduce.task.attempt.id", this.attempt0);
        configuration.setInt("mapreduce.job.application.attempt.id", 1);
        JobContextImpl jobContextImpl = new JobContextImpl(configuration, this.taskAttempt0.getJobID());
        TaskAttemptContextImpl taskAttemptContextImpl = new TaskAttemptContextImpl(configuration, this.taskAttempt0);
        JobData jobData = new JobData(newJob, jobContextImpl, taskAttemptContextImpl, committerFactory.createCommitter(taskAttemptContextImpl));
        setupJob(jobData);
        abortInTeardown(jobData);
        if (z) {
            jobData.writtenTextPath = writeTextOutput(taskAttemptContextImpl);
        }
        return jobData;
    }

    protected void setupJob(JobData jobData) throws IOException {
        ManifestCommitter manifestCommitter = jobData.committer;
        JobContext jobContext = jobData.jContext;
        TaskAttemptContext taskAttemptContext = jobData.tContext;
        describe("\nsetup job");
        DurationInfo durationInfo = new DurationInfo(LOG, "setup job %s", new Object[]{jobContext.getJobID()});
        try {
            manifestCommitter.setupJob(jobContext);
            durationInfo.close();
            setupCommitter(manifestCommitter, taskAttemptContext);
            describe("setup complete");
        } catch (Throwable th) {
            try {
                durationInfo.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void setupCommitter(ManifestCommitter manifestCommitter, TaskAttemptContext taskAttemptContext) throws IOException {
        DurationInfo durationInfo = new DurationInfo(LOG, "setup task %s", new Object[]{taskAttemptContext.getTaskAttemptID()});
        try {
            manifestCommitter.setupTask(taskAttemptContext);
            durationInfo.close();
        } catch (Throwable th) {
            try {
                durationInfo.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    protected void abortJobQuietly(JobData jobData) {
        abortJobQuietly(jobData.committer, jobData.jContext, jobData.tContext);
    }

    protected void abortJobQuietly(ManifestCommitter manifestCommitter, JobContext jobContext, TaskAttemptContext taskAttemptContext) {
        describe("\naborting task");
        try {
            manifestCommitter.abortTask(taskAttemptContext);
        } catch (Exception e) {
            log().warn("Exception aborting task:", e);
        }
        describe("\naborting job");
        try {
            manifestCommitter.abortJob(jobContext, JobStatus.State.KILLED);
        } catch (Exception e2) {
            log().warn("Exception aborting job", e2);
        }
    }

    protected void commitTaskAndJob(ManifestCommitter manifestCommitter, JobContext jobContext, TaskAttemptContext taskAttemptContext) throws IOException {
        DurationInfo durationInfo = new DurationInfo(LOG, "committing Job %s", new Object[]{jobContext.getJobID()});
        try {
            describe("\ncommitting task");
            manifestCommitter.commitTask(taskAttemptContext);
            describe("\ncommitting job");
            manifestCommitter.commitJob(jobContext);
            describe("commit complete\n");
            durationInfo.close();
        } catch (Throwable th) {
            try {
                durationInfo.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    protected void executeWork(String str, ActionToTest actionToTest) throws Exception {
        executeWork(str, startJob(false), actionToTest);
    }

    public void executeWork(String str, JobData jobData, ActionToTest actionToTest) throws Exception {
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "Executing %s", new Object[]{str});
            try {
                actionToTest.exec(jobData.job, jobData.jContext, jobData.tContext, jobData.committer);
                durationInfo.close();
            } finally {
            }
        } finally {
            abortJobQuietly(jobData);
        }
    }

    TaskManifest loadManifest(Path path) throws IOException {
        return TaskManifest.load(getFileSystem(), path);
    }

    @Test
    public void testRecoveryAndCleanup() throws Exception {
        describe("Test (unsupported) task recovery.");
        JobData startJob = startJob(true);
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        ManifestCommitter manifestCommitter = startJob.committer;
        Assertions.assertThat(manifestCommitter.getWorkPath()).as("null workPath in committer " + manifestCommitter, new Object[0]).isNotNull();
        Assertions.assertThat(manifestCommitter.getOutputPath()).as("null outputPath in committer " + manifestCommitter, new Object[0]).isNotNull();
        commitTask(manifestCommitter, taskAttemptContext);
        LOG.info("Manifest {}", loadManifest(manifestCommitter.getTaskManifestPath(taskAttemptContext)));
        Configuration configuration = startJob.job.getConfiguration();
        configuration.set("mapreduce.task.attempt.id", this.attempt0);
        configuration.setInt("mapreduce.job.application.attempt.id", 2);
        JobContextImpl jobContextImpl = new JobContextImpl(configuration, this.taskAttempt0.getJobID());
        TaskAttemptContextImpl taskAttemptContextImpl = new TaskAttemptContextImpl(configuration, this.taskAttempt0);
        ManifestCommitter createCommitter = createCommitter(taskAttemptContextImpl);
        createCommitter.setupJob(taskAttemptContextImpl);
        Assertions.assertThat(createCommitter.isRecoverySupported()).as("recoverySupported in " + createCommitter, new Object[0]).isFalse();
        LambdaTestUtils.intercept(IOException.class, "recover", () -> {
            createCommitter.recoverTask(taskAttemptContextImpl);
        });
        describe("aborting task attempt 2; expect nothing to clean up");
        createCommitter.abortTask(taskAttemptContextImpl);
        describe("Aborting job 2; expect pending commits to be aborted");
        createCommitter.abortJob(jobContextImpl, JobStatus.State.KILLED);
    }

    protected void assertTaskAttemptPathDoesNotExist(ManifestCommitter manifestCommitter, TaskAttemptContext taskAttemptContext) throws IOException {
        Path taskAttemptPath = manifestCommitter.getTaskAttemptPath(taskAttemptContext);
        ContractTestUtils.assertPathDoesNotExist(taskAttemptPath.getFileSystem(taskAttemptContext.getConfiguration()), "task attempt dir", taskAttemptPath);
    }

    protected void assertJobAttemptPathDoesNotExist(ManifestCommitter manifestCommitter, JobContext jobContext) throws IOException {
        Path jobAttemptPath = manifestCommitter.getJobAttemptPath(jobContext);
        ContractTestUtils.assertPathDoesNotExist(jobAttemptPath.getFileSystem(jobContext.getConfiguration()), "job attempt dir", jobAttemptPath);
    }

    private ManifestSuccessData validateContent(Path path, boolean z, String str) throws Exception {
        lsR(getFileSystem(), path, true);
        ManifestSuccessData verifySuccessMarker = z ? verifySuccessMarker(path, str) : null;
        Path part0000 = getPart0000(path);
        log().debug("Validating content in {}", part0000);
        StringBuilder sb = new StringBuilder();
        sb.append(KEY_1).append('\t').append(VAL_1).append("\n");
        sb.append(VAL_1).append("\n");
        sb.append(VAL_2).append("\n");
        sb.append(KEY_2).append("\n");
        sb.append(KEY_1).append("\n");
        sb.append(KEY_2).append('\t').append(VAL_2).append("\n");
        Assertions.assertThat(readFile(part0000)).describedAs("Content of %s", new Object[]{part0000}).isEqualTo(sb.toString());
        return verifySuccessMarker;
    }

    protected Path getPart0000(Path path) throws Exception {
        FileSystem fileSystem = path.getFileSystem(getConfiguration());
        FileStatus[] listStatus = fileSystem.listStatus(path, path2 -> {
            return path2.getName().startsWith(PART_00000);
        });
        if (listStatus.length != 1) {
            ContractTestUtils.assertPathExists(fileSystem, "Output file", new Path(path, PART_00000));
        }
        return listStatus[0].getPath();
    }

    private void validateMapFileOutputContent(FileSystem fileSystem, Path path) throws Exception {
        assertPathExists("Map output", path);
        Path part0000 = getPart0000(path);
        assertPathExists("Map output", part0000);
        assertIsDirectory(part0000);
        Assertions.assertThat(fileSystem.listStatus(part0000)).as("No files found in " + part0000, new Object[0]).isNotEmpty();
        assertPathExists("index file in " + part0000, new Path(part0000, "index"));
        assertPathExists("data file in " + part0000, new Path(part0000, "data"));
    }

    @Test
    public void testCommitLifecycle() throws Exception {
        describe("Full test of the expected lifecycle:\n start job, task, write, commit task, commit job.\nVerify:\n* no files are visible after task commit\n* the expected file is visible after job commit\n");
        JobData startJob = startJob(false);
        JobContext jobContext = startJob.jContext;
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        ManifestCommitter manifestCommitter = startJob.committer;
        assertCommitterFactoryIsManifestCommitter(taskAttemptContext, taskAttemptContext.getWorkingDirectory());
        validateTaskAttemptWorkingDirectory(manifestCommitter, taskAttemptContext);
        describe("1. Writing output");
        describe("Output written to %s", writeTextOutput(taskAttemptContext));
        describe("2. Committing task");
        Assertions.assertThat(manifestCommitter.needsTaskCommit(taskAttemptContext)).as("No files to commit were found by " + manifestCommitter, new Object[0]).isTrue();
        commitTask(manifestCommitter, taskAttemptContext);
        TaskManifest taskManifest = (TaskManifest) Objects.requireNonNull(manifestCommitter.getTaskAttemptCommittedManifest(), "committerTaskManifest");
        String json = taskManifest.toJson();
        LOG.info("Task manifest {}", json);
        Assertions.assertThat(taskManifest.getFilesToCommit()).describedAs("Files to commit in task manifest %s", new Object[]{json}).hasSize(1);
        Assertions.assertThat(taskManifest.getDestDirectories()).describedAs("Directories to create in task manifest %s", new Object[]{json}).isEmpty();
        try {
            RemoteIterators.foreach(getFileSystem().listFiles(this.outputDir, false), locatedFileStatus -> {
                Assertions.assertThat(locatedFileStatus.getPath().toString()).as("task committed file to dest :" + locatedFileStatus, new Object[0]).contains(new CharSequence[]{"part"});
            });
        } catch (FileNotFoundException e) {
            log().info("Outdir {} is not created by task commit phase ", this.outputDir);
        }
        describe("3. Committing job");
        commitJob(manifestCommitter, jobContext);
        describe("4. Validating content");
        String jobId = startJob.jobId();
        ManifestSuccessData validateContent = validateContent(this.outputDir, true, jobId);
        Assertions.assertThat(validateContent.getDiagnostics()).describedAs("Stage entry in SUCCESS", new Object[0]).containsEntry("stage", "committer_commit_job");
        IOStatisticsSnapshot iOStatistics = validateContent.getIOStatistics();
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics, "op_load_manifest", 1L);
        FileStatus fileStatus = getFileSystem().getFileStatus(getPart0000(this.outputDir));
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics, "committer_files_committed", 1);
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics, "committer_bytes_committed", fileStatus.getLen());
        ManifestSuccessData loadReport = loadReport(jobId, true);
        Assertions.assertThat(loadReport.getDiagnostics()).describedAs("Stage entry in report", new Object[0]).containsEntry("stage", "committer_commit_job");
        IOStatisticsSnapshot iOStatistics2 = loadReport.getIOStatistics();
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics2, "op_load_manifest", 1L);
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics2, "committer_commit_job", 1L);
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics2, "committer_files_committed", 1);
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics2, "committer_bytes_committed", fileStatus.getLen());
    }

    private ManifestSuccessData loadReport(String str, boolean z) throws IOException {
        File file = new File(getReportDir(), ManifestCommitterSupport.createJobSummaryFilename(str));
        ContractTestUtils.assertIsFile(FileSystem.getLocal(getConfiguration()), new Path(file.toURI()));
        ManifestSuccessData manifestSuccessData = (ManifestSuccessData) ManifestSuccessData.serializer().load(file);
        LOG.info("Report for job {}:\n{}", str, manifestSuccessData.toJson());
        Assertions.assertThat(manifestSuccessData.getSuccess()).describedAs("success flag in report", new Object[0]).isEqualTo(z);
        return manifestSuccessData;
    }

    @Test
    public void testCommitterWithDuplicatedCommit() throws Exception {
        describe("Call a task then job commit twice;expect the second task commit to fail.");
        JobData startJob = startJob(true);
        JobContext jobContext = startJob.jContext;
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        ManifestCommitter manifestCommitter = startJob.committer;
        describe("committing task");
        manifestCommitter.commitTask(taskAttemptContext);
        manifestCommitter.commitTask(taskAttemptContext);
        describe("committing job");
        manifestCommitter.commitJob(jobContext);
        describe("commit complete\n");
        describe("cleanup");
        manifestCommitter.cleanupJob(jobContext);
        validateContent(this.outputDir, shouldExpectSuccessMarker(), manifestCommitter.getJobUniqueId());
        describe("Attempting commit of the same task after job commit -expecting failure");
        expectFNFEonTaskCommit(manifestCommitter, taskAttemptContext);
    }

    @Test
    public void testTwoTaskAttemptsCommit() throws Exception {
        describe("Commit two task attempts; expect the second attempt to succeed.");
        JobData startJob = startJob(false);
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        ManifestCommitter manifestCommitter = startJob.committer;
        describe("\ncommitting task");
        Path writeTextOutput = writeTextOutput(taskAttemptContext);
        Configuration configuration = startJob.conf;
        configuration.set("mapreduce.output.basename", "attempt2");
        TaskAttemptContext taskAttemptContextImpl = new TaskAttemptContextImpl(configuration, TaskAttemptID.forName("attempt_" + this.jobId + "_m_000000_1"));
        ManifestCommitter createCommitter = this.localCommitterFactory.createCommitter(taskAttemptContextImpl);
        setupCommitter(createCommitter, taskAttemptContextImpl);
        Assertions.assertThat(manifestCommitter.getWorkPath()).describedAs("Working dir of %s", new Object[]{manifestCommitter}).isNotEqualTo(createCommitter.getWorkPath());
        Path writeTextOutput2 = writeTextOutput(taskAttemptContextImpl);
        String name = writeTextOutput.getName();
        String name2 = writeTextOutput2.getName();
        Assertions.assertThat(name).describedAs("name of task attempt output %s", new Object[]{writeTextOutput}).isNotEqualTo(name2);
        manifestCommitter.commitTask(taskAttemptContext);
        createCommitter.commitTask(taskAttemptContextImpl);
        createCommitter.commitJob(taskAttemptContext);
        Assertions.assertThat(ManifestCommitterTestSupport.validateSuccessFile(getFileSystem(), this.outputDir, 1, "").getFilenames()).describedAs("Files committed", new Object[0]).hasSize(1);
        assertPathExists("attempt2 output", new Path(this.outputDir, name2));
        assertPathDoesNotExist("attempt1 output", new Path(this.outputDir, name));
    }

    protected boolean shouldExpectSuccessMarker() {
        return true;
    }

    protected void expectJobCommitToFail(JobContext jobContext, ManifestCommitter manifestCommitter) throws Exception {
        expectJobCommitFailure(jobContext, manifestCommitter, FileNotFoundException.class);
    }

    protected static <E extends IOException> E expectJobCommitFailure(JobContext jobContext, ManifestCommitter manifestCommitter, Class<E> cls) throws Exception {
        return (E) LambdaTestUtils.intercept(cls, () -> {
            manifestCommitter.commitJob(jobContext);
            return manifestCommitter.toString();
        });
    }

    protected static void expectFNFEonTaskCommit(ManifestCommitter manifestCommitter, TaskAttemptContext taskAttemptContext) throws Exception {
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
            manifestCommitter.commitTask(taskAttemptContext);
            return manifestCommitter.toString();
        });
    }

    @Test
    public void testCommitterWithNoOutputs() throws Exception {
        describe("Have a task and job with no outputs: expect success");
        JobData startJob = startJob(this.localCommitterFactory, false);
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        ManifestCommitter manifestCommitter = startJob.committer;
        manifestCommitter.commitTask(taskAttemptContext);
        Path taskAttemptPath = manifestCommitter.getTaskAttemptPath(taskAttemptContext);
        ContractTestUtils.assertPathExists(taskAttemptPath.getFileSystem(taskAttemptContext.getConfiguration()), "task attempt dir", taskAttemptPath);
    }

    @Test
    public void testMapFileOutputCommitter() throws Exception {
        describe("Test that the committer generates map output into a directory\nstarting with the prefix part-");
        JobData startJob = startJob(false);
        JobContext jobContext = startJob.jContext;
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        ManifestCommitter manifestCommitter = startJob.committer;
        Configuration configuration = startJob.conf;
        writeMapFileOutput(new MapFileOutputFormat().getRecordWriter(taskAttemptContext), taskAttemptContext);
        commitTaskAndJob(manifestCommitter, jobContext, taskAttemptContext);
        FileSystem fileSystem = getFileSystem();
        lsR(fileSystem, this.outputDir, true);
        String ls = ls(this.outputDir);
        describe("\nvalidating");
        verifySuccessMarker(this.outputDir, manifestCommitter.getJobUniqueId());
        describe("validate output of %s", this.outputDir);
        validateMapFileOutputContent(fileSystem, this.outputDir);
        describe("listing");
        FileStatus[] listStatus = fileSystem.listStatus(this.outputDir, HIDDEN_FILE_FILTER);
        Assertions.assertThat(listStatus).describedAs("listed children under %s", new Object[]{ls}).hasSize(1);
        FileStatus fileStatus = listStatus[0];
        Assertions.assertThat(fileStatus.getPath().getName()).as("Not the part file: " + fileStatus, new Object[0]).startsWith(PART_00000);
        describe("getReaders()");
        Assertions.assertThat(getReaders(fileSystem, this.outputDir, configuration)).describedAs("getReaders() MapFile.Reader entries with shared FS %s %s", new Object[]{this.outputDir, ls}).hasSize(1);
        describe("getReaders(new FS)");
        Assertions.assertThat(getReaders(FileSystem.get(this.outputDir.toUri(), configuration), this.outputDir, configuration)).describedAs("getReaders(new FS) %s %s", new Object[]{this.outputDir, ls}).hasSize(1);
        describe("MapFileOutputFormat.getReaders");
        Assertions.assertThat(MapFileOutputFormat.getReaders(this.outputDir, configuration)).describedAs("MapFileOutputFormat.getReaders(%s) %s", new Object[]{this.outputDir, ls}).hasSize(1);
    }

    private static MapFile.Reader[] getReaders(FileSystem fileSystem, Path path, Configuration configuration) throws IOException {
        Path[] stat2Paths = FileUtil.stat2Paths(fileSystem.listStatus(path, HIDDEN_FILE_FILTER));
        Arrays.sort(stat2Paths);
        MapFile.Reader[] readerArr = new MapFile.Reader[stat2Paths.length];
        for (int i = 0; i < stat2Paths.length; i++) {
            readerArr[i] = new MapFile.Reader(stat2Paths[i], configuration, new SequenceFile.Reader.Option[0]);
        }
        return readerArr;
    }

    @Test
    public void testAbortTaskNoWorkDone() throws Exception {
        executeWork("abort task no work", (job, jobContext, taskAttemptContext, manifestCommitter) -> {
            manifestCommitter.abortTask(taskAttemptContext);
        });
    }

    @Test
    public void testAbortJobNoWorkDone() throws Exception {
        executeWork("abort task no work", (job, jobContext, taskAttemptContext, manifestCommitter) -> {
            manifestCommitter.abortJob(jobContext, JobStatus.State.RUNNING);
        });
    }

    @Test
    public void testCommitJobButNotTask() throws Exception {
        executeWork("commit a job while a task's work is pending, expect task writes to be cancelled.", (job, jobContext, taskAttemptContext, manifestCommitter) -> {
            writeTextOutput(taskAttemptContext);
            createCommitter(taskAttemptContext).commitJob(taskAttemptContext);
            assertPart0000DoesNotExist(this.outputDir);
        });
    }

    @Test
    public void testAbortTaskThenJob() throws Exception {
        JobData startJob = startJob(true);
        ManifestCommitter manifestCommitter = startJob.committer;
        manifestCommitter.abortTask(startJob.tContext);
        LambdaTestUtils.intercept(FileNotFoundException.class, "", () -> {
            return getPart0000(manifestCommitter.getWorkPath());
        });
        manifestCommitter.abortJob(startJob.jContext, JobStatus.State.FAILED);
        assertJobAbortCleanedUp(startJob);
    }

    public void assertJobAbortCleanedUp(JobData jobData) throws Exception {
        FileSystem fileSystem = getFileSystem();
        try {
            FileStatus[] listChildren = ContractTestUtils.listChildren(fileSystem, this.outputDir);
            if (listChildren.length != 0) {
                lsR(fileSystem, this.outputDir, true);
            }
            Assertions.assertThat(listChildren).as("Output directory not empty " + ls(this.outputDir), new Object[0]).containsExactly(new FileStatus[0]);
        } catch (FileNotFoundException e) {
        }
    }

    @Test
    public void testFailAbort() throws Exception {
        describe("Abort the task, then job (failed), abort the job again");
        JobData startJob = startJob(true);
        JobContext jobContext = startJob.jContext;
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        ManifestCommitter manifestCommitter = startJob.committer;
        manifestCommitter.abortTask(taskAttemptContext);
        manifestCommitter.getJobAttemptPath(jobContext);
        manifestCommitter.getTaskAttemptPath(taskAttemptContext);
        assertPart0000DoesNotExist(this.outputDir);
        assertSuccessMarkerDoesNotExist(this.outputDir);
        describe("Aborting job into %s", this.outputDir);
        manifestCommitter.abortJob(jobContext, JobStatus.State.FAILED);
        assertTaskAttemptPathDoesNotExist(manifestCommitter, taskAttemptContext);
        assertJobAttemptPathDoesNotExist(manifestCommitter, jobContext);
        ManifestSuccessData loadReport = loadReport(startJob.jobId(), false);
        Assertions.assertThat(loadReport.getDiagnostics()).describedAs("Stage entry in report", new Object[0]).containsEntry("stage", "job_stage_abort");
        IOStatisticAssertions.verifyStatisticCounterValue(loadReport.getIOStatistics(), "job_stage_abort", 1L);
        manifestCommitter.abortJob(jobContext, JobStatus.State.FAILED);
    }

    protected void assertSuccessMarkerDoesNotExist(Path path) throws IOException {
        assertPathDoesNotExist("Success marker", new Path(path, "_SUCCESS"));
    }

    public void assertPart0000DoesNotExist(Path path) throws Exception {
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
            return getPart0000(path);
        });
        assertPathDoesNotExist("expected output file", new Path(path, PART_00000));
    }

    @Test
    public void testAbortJobNotTask() throws Exception {
        executeWork("abort task no work", (job, jobContext, taskAttemptContext, manifestCommitter) -> {
            writeTextOutput(taskAttemptContext);
            manifestCommitter.abortJob(jobContext, JobStatus.State.RUNNING);
            assertTaskAttemptPathDoesNotExist(manifestCommitter, taskAttemptContext);
            assertJobAttemptPathDoesNotExist(manifestCommitter, jobContext);
        });
    }

    @Test
    public void testConcurrentCommitTaskWithSubDir() throws Exception {
        boolean awaitTermination;
        Job newJob = newJob();
        FileOutputFormat.setOutputPath(newJob, this.outputDir);
        Configuration configuration = newJob.getConfiguration();
        JobContextImpl jobContextImpl = new JobContextImpl(configuration, this.taskAttempt0.getJobID());
        ManifestCommitter createCommitter = createCommitter(new TaskAttemptContextImpl(configuration, this.taskAttempt0));
        createCommitter.setupJob(jobContextImpl);
        TaskAttemptContextImpl[] taskAttemptContextImplArr = {new TaskAttemptContextImpl(configuration, this.taskAttempt0), new TaskAttemptContextImpl(configuration, this.taskAttempt1)};
        TextOutputForTests[] textOutputForTestsArr = new TextOutputForTests[2];
        for (int i = 0; i < textOutputForTestsArr.length; i++) {
            textOutputForTestsArr[i] = new TextOutputForTests<Writable, Object>() { // from class: org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestManifestCommitProtocol.1
                public Path getDefaultWorkFile(TaskAttemptContext taskAttemptContext, String str) throws IOException {
                    return new Path(new Path(getOutputCommitter(taskAttemptContext).getWorkPath(), TestManifestCommitProtocol.SUB_DIR), getUniqueFile(taskAttemptContext, getOutputName(taskAttemptContext), str));
                }
            };
        }
        ExecutorService newFixedThreadPool = HadoopExecutors.newFixedThreadPool(2);
        for (int i2 = 0; i2 < taskAttemptContextImplArr.length; i2++) {
            try {
                int i3 = i2;
                newFixedThreadPool.submit(() -> {
                    OutputCommitter outputCommitter = textOutputForTestsArr[i3].getOutputCommitter(taskAttemptContextImplArr[i3]);
                    outputCommitter.setupTask(taskAttemptContextImplArr[i3]);
                    writeOutput(textOutputForTestsArr[i3].getRecordWriter(taskAttemptContextImplArr[i3]), taskAttemptContextImplArr[i3]);
                    describe("Committing Task %d", Integer.valueOf(i3));
                    outputCommitter.commitTask(taskAttemptContextImplArr[i3]);
                    return null;
                });
            } finally {
                newFixedThreadPool.shutdown();
                while (!newFixedThreadPool.awaitTermination(1L, TimeUnit.SECONDS)) {
                    log().info("Awaiting thread termination!");
                }
            }
        }
        while (true) {
            if (awaitTermination) {
                describe("\nCommitting Job");
                createCommitter.commitJob(jobContextImpl);
                assertPathExists("base output directory", this.outputDir);
                assertPart0000DoesNotExist(this.outputDir);
                Path path = new Path(this.outputDir, SUB_DIR);
                assertPathDoesNotExist("Must not end up with sub_dir/sub_dir", new Path(path, SUB_DIR));
                validateContent(path, false, "");
                return;
            }
        }
    }

    @Test
    public void testUnsupportedSchema() throws Throwable {
        LambdaTestUtils.intercept(PathIOException.class, () -> {
            return new ManifestCommitterFactory().createOutputCommitter(new Path("s3a://unsupported/"), (TaskAttemptContext) null);
        });
    }

    @Test
    public void testOutputFormatIntegration() throws Throwable {
        getConfiguration();
        Job newJob = newJob();
        assertCommitterFactoryIsManifestCommitter(newJob, this.outputDir);
        newJob.setOutputFormatClass(TextOutputForTests.class);
        Configuration configuration = newJob.getConfiguration();
        configuration.set("mapreduce.task.attempt.id", this.attempt0);
        configuration.setInt("mapreduce.job.application.attempt.id", 1);
        JobContextImpl jobContextImpl = new JobContextImpl(configuration, this.taskAttempt0.getJobID());
        TaskAttemptContext taskAttemptContextImpl = new TaskAttemptContextImpl(configuration, this.taskAttempt0);
        TextOutputForTests textOutputForTests = (TextOutputForTests) ReflectionUtils.newInstance(taskAttemptContextImpl.getOutputFormatClass(), configuration);
        ManifestCommitter manifestCommitter = (ManifestCommitter) textOutputForTests.getOutputCommitter(taskAttemptContextImpl);
        Assertions.assertThat(manifestCommitter.hasCapability("mapreduce.job.committer.dynamic.partitioning")).describedAs("dynamic partitioning capability in committer %s", new Object[]{manifestCommitter}).isTrue();
        BindingPathOutputCommitter bindingPathOutputCommitter = new BindingPathOutputCommitter(this.outputDir, taskAttemptContextImpl);
        Assertions.assertThat(bindingPathOutputCommitter.hasCapability("mapreduce.job.committer.dynamic.partitioning")).describedAs("dynamic partitioning capability in committer %s", new Object[]{bindingPathOutputCommitter}).isTrue();
        JobData jobData = new JobData(newJob, jobContextImpl, taskAttemptContextImpl, manifestCommitter);
        setupJob(jobData);
        abortInTeardown(jobData);
        TextOutputForTests.LoggingLineRecordWriter m39getRecordWriter = textOutputForTests.m39getRecordWriter(taskAttemptContextImpl);
        IntWritable intWritable = new IntWritable(1);
        m39getRecordWriter.write(intWritable, intWritable);
        Path dest = m39getRecordWriter.getDest();
        validateTaskAttemptPathDuringWrite(dest, 4L);
        m39getRecordWriter.close(taskAttemptContextImpl);
        validateTaskAttemptPathAfterWrite(dest, 4L);
        Assertions.assertThat(manifestCommitter.needsTaskCommit(taskAttemptContextImpl)).as("Committer does not have data to commit " + manifestCommitter, new Object[0]).isTrue();
        commitTask(manifestCommitter, taskAttemptContextImpl);
        IOStatisticsSnapshot iOStatisticsSnapshot = new IOStatisticsSnapshot(manifestCommitter.getIOStatistics());
        LOG.info("after task commit {}", IOStatisticsLogging.ioStatisticsToPrettyString(iOStatisticsSnapshot));
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatisticsSnapshot, "committer_tasks_completed", 1L);
        LOG.info("Manifest {}", loadManifest(manifestCommitter.getTaskManifestPath(taskAttemptContextImpl)).toJson());
        commitJob(manifestCommitter, jobContextImpl);
        LOG.info("committer iostatistics {}", IOStatisticsLogging.ioStatisticsSourceToString(manifestCommitter));
        IOStatisticsSnapshot iOStatistics = verifySuccessMarker(this.outputDir, manifestCommitter.getJobUniqueId()).getIOStatistics();
        LOG.info("loaded statistics {}", iOStatistics);
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics, "committer_tasks_completed", 1L);
    }

    @Test
    public void testAMWorkflow() throws Throwable {
        describe("Create a committer with a null output path & use as an AM");
        JobData startJob = startJob(true);
        JobContext jobContext = startJob.jContext;
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        TaskAttemptContextImpl taskAttemptContextImpl = new TaskAttemptContextImpl(jobContext.getConfiguration(), this.taskAttempt0);
        Configuration configuration = jobContext.getConfiguration();
        TextOutputForTests.bind(configuration);
        OutputFormat outputFormat = (OutputFormat) ReflectionUtils.newInstance(taskAttemptContextImpl.getOutputFormatClass(), configuration);
        Assertions.assertThat(FileOutputFormat.getOutputPath(taskAttemptContextImpl)).as("null output path in new task attempt", new Object[0]).isNotNull();
        outputFormat.getOutputCommitter(taskAttemptContextImpl).abortTask(taskAttemptContext);
    }

    @Test
    public void testParallelJobsToAdjacentPaths() throws Throwable {
        describe("Run two jobs in parallel, assert they both complete");
        JobData startJob = startJob(true);
        Job job = startJob.job;
        ManifestCommitter manifestCommitter = startJob.committer;
        JobContext jobContext = startJob.jContext;
        TaskAttemptContext taskAttemptContext = startJob.tContext;
        String randomJobId = ManifestCommitterTestSupport.randomJobId();
        String str = "attempt_" + randomJobId + "_m_000000_0";
        TaskAttemptID forName = TaskAttemptID.forName(str);
        TaskAttemptID.forName("attempt_" + randomJobId + "_m_000001_0");
        Path path = this.outputDir;
        Path path2 = new Path(getOutputDir().getParent(), getMethodName() + "job2Dest");
        Assertions.assertThat(path2).describedAs("Job destinations", new Object[0]).isNotEqualTo(path);
        Job newJob = newJob(path2, unsetUUIDOptions(new JobConf(getConfiguration())), str);
        Configuration configuration = newJob.getConfiguration();
        configuration.setInt("mapreduce.job.application.attempt.id", 1);
        ManifestCommitter manifestCommitter2 = null;
        try {
            JobContextImpl jobContextImpl = new JobContextImpl(configuration, forName.getJobID());
            TaskAttemptContextImpl taskAttemptContextImpl = new TaskAttemptContextImpl(configuration, forName);
            manifestCommitter2 = createCommitter(path2, taskAttemptContextImpl);
            JobData jobData = new JobData(newJob, jobContextImpl, taskAttemptContextImpl, manifestCommitter2);
            setupJob(jobData);
            abortInTeardown(jobData);
            Assertions.assertThat(manifestCommitter.getOutputPath()).describedAs("Committer output path of %s and %s", new Object[]{manifestCommitter, manifestCommitter2}).isNotEqualTo(manifestCommitter2.getOutputPath());
            Assertions.assertThat(manifestCommitter.getJobUniqueId()).describedAs("JobUnique IDs of %s and %s", new Object[]{manifestCommitter, manifestCommitter2}).isNotEqualTo(manifestCommitter2.getJobUniqueId());
            writeTextOutput(taskAttemptContextImpl);
            commitTask(manifestCommitter2, taskAttemptContextImpl);
            commitTask(manifestCommitter, taskAttemptContext);
            commitJob(manifestCommitter, jobContext);
            getPart0000(path);
            commitJob(manifestCommitter2, jobContextImpl);
            getPart0000(path2);
            FileSystem fileSystem = getFileSystem();
            if (manifestCommitter != null) {
                fileSystem.delete(manifestCommitter.getOutputPath(), true);
            }
            if (manifestCommitter2 != null) {
                fileSystem.delete(manifestCommitter2.getOutputPath(), true);
            }
        } catch (Throwable th) {
            FileSystem fileSystem2 = getFileSystem();
            if (manifestCommitter != null) {
                fileSystem2.delete(manifestCommitter.getOutputPath(), true);
            }
            if (manifestCommitter2 != null) {
                fileSystem2.delete(manifestCommitter2.getOutputPath(), true);
            }
            throw th;
        }
    }

    protected Configuration unsetUUIDOptions(Configuration configuration) {
        configuration.unset("spark.sql.sources.writeJobUUID");
        return configuration;
    }

    protected void assertJobAttemptPathExists(ManifestCommitter manifestCommitter, JobContext jobContext) throws IOException {
        Path jobAttemptPath = manifestCommitter.getJobAttemptPath(jobContext);
        ContractTestUtils.assertIsDirectory(jobAttemptPath.getFileSystem(manifestCommitter.getConf()), jobAttemptPath);
    }

    protected void validateTaskAttemptPathDuringWrite(Path path, long j) throws IOException {
    }

    protected void validateTaskAttemptPathAfterWrite(Path path, long j) throws IOException {
    }

    protected void validateTaskAttemptWorkingDirectory(ManifestCommitter manifestCommitter, TaskAttemptContext taskAttemptContext) throws IOException {
    }

    protected void commitTask(ManifestCommitter manifestCommitter, TaskAttemptContext taskAttemptContext) throws IOException {
        manifestCommitter.commitTask(taskAttemptContext);
    }

    protected void commitJob(ManifestCommitter manifestCommitter, JobContext jobContext) throws IOException {
        manifestCommitter.commitJob(jobContext);
    }
}
