/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.security;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class TestBinaryTokenFile {
    private static MiniMRCluster mrCluster;
    private static MiniDFSCluster dfsCluster;
    private static final Path TEST_DIR;
    private static final Path binaryTokenFileName;
    private static int numSlaves;
    private static JobConf jConf;
    private static Path p1;

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration conf = new Configuration();
        dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
        jConf = new JobConf(conf);
        mrCluster = new MiniMRCluster(0, 0, numSlaves, dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, jConf);
        NameNodeAdapter.getDtSecretManager((FSNamesystem)dfsCluster.getNamesystem()).startThreads();
        FileSystem fs = dfsCluster.getFileSystem();
        p1 = new Path("file1");
        p1 = fs.makeQualified(p1);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (mrCluster != null) {
            mrCluster.shutdown();
        }
        mrCluster = null;
        if (dfsCluster != null) {
            dfsCluster.shutdown();
        }
        dfsCluster = null;
    }

    @Test
    public void testBinaryTokenFile() throws IOException {
        System.out.println("running dist job");
        jConf = mrCluster.createJobConf();
        String nnUri = dfsCluster.getURI(0).toString();
        jConf.set("mapreduce.job.hdfs-servers", nnUri + "," + nnUri);
        jConf.set("mapreduce.jobtracker.kerberos.principal", "jt_id");
        String[] args = new String[]{"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"};
        int res = -1;
        try {
            res = ToolRunner.run((Configuration)jConf, (Tool)new MySleepJob(), (String[])args);
        }
        catch (Exception e) {
            System.out.println("Job failed with" + e.getLocalizedMessage());
            e.printStackTrace(System.out);
            Assert.fail((String)"Job failed");
        }
        Assert.assertEquals((String)"dist job res is not 0", (long)res, (long)0L);
    }

    static {
        TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"));
        binaryTokenFileName = new Path(TEST_DIR, "tokenFile.binary");
        numSlaves = 1;
    }

    class MySleepJob
    extends SleepJob {
        MySleepJob() {
        }

        @Override
        public Job createJob(int numMapper, int numReducer, long mapSleepTime, int mapSleepCount, long reduceSleepTime, int reduceSleepCount) throws IOException {
            Job job = super.createJob(numMapper, numReducer, mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount);
            job.setMapperClass(MySleepMapper.class);
            this.setupBinaryTokenFile(job);
            return job;
        }

        private void setupBinaryTokenFile(Job job) {
            try {
                Credentials cred1 = new Credentials();
                Credentials cred2 = new Credentials();
                TokenCache.obtainTokensForNamenodesInternal((Credentials)cred1, (Path[])new Path[]{p1}, (Configuration)job.getConfiguration());
                for (Token t : cred1.getAllTokens()) {
                    cred2.addToken(new Text("Hdfs"), t);
                }
                DataOutputStream os = new DataOutputStream(new FileOutputStream(binaryTokenFileName.toString()));
                cred2.writeTokenStorageToStream(os);
                os.close();
                job.getConfiguration().set("mapreduce.job.credentials.binary", binaryTokenFileName.toString());
            }
            catch (IOException e) {
                Assert.fail((String)("Exception " + e));
            }
        }
    }

    static class MySleepMapper
    extends SleepJob.SleepMapper {
        MySleepMapper() {
        }

        @Override
        public void map(IntWritable key, IntWritable value, Mapper.Context context) throws IOException, InterruptedException {
            Credentials ts = context.getCredentials();
            Collection dts = ts.getAllTokens();
            if (dts.size() != 2) {
                throw new RuntimeException("tokens are not available");
            }
            Token dt = ts.getToken(new Text("Hdfs"));
            String tokenFile = context.getConfiguration().get("mapreduce.job.credentials.binary");
            Credentials cred = new Credentials();
            cred.readTokenStorageStream(new DataInputStream(new FileInputStream(tokenFile)));
            for (Token t : cred.getAllTokens()) {
                if (dt.equals((Object)t)) continue;
                throw new RuntimeException("Delegation token in job is not same as the token passed in file. tokenInFile=" + t + ", dt=" + dt);
            }
            super.map(key, value, context);
        }
    }
}

