/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.math.BigInteger;
import java.net.InetAddress;
import java.security.Key;
import java.security.KeyPair;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import javax.security.auth.x500.X500Principal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.tez.mapreduce.examples.TestOrderedWordCount;
import org.apache.tez.test.MiniTezCluster;
import org.bouncycastle.asn1.ASN1Encodable;
import org.bouncycastle.asn1.x509.GeneralName;
import org.bouncycastle.asn1.x509.GeneralNames;
import org.bouncycastle.asn1.x509.X509Extensions;
import org.bouncycastle.x509.X509V3CertificateGenerator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestSecureShuffle {
    private static MiniDFSCluster miniDFSCluster;
    private static MiniTezCluster miniTezCluster;
    private static Configuration conf;
    private static FileSystem fs;
    private static Path inputLoc;
    private static String TEST_ROOT_DIR;
    private static File keysStoresDir;
    private boolean enableSSLInCluster;
    private int resultWithTezSSL;
    private int resultWithoutTezSSL;
    private boolean asyncHttp;

    public TestSecureShuffle(boolean sslInCluster, int resultWithTezSSL, int resultWithoutTezSSL, boolean asyncHttp) {
        this.enableSSLInCluster = sslInCluster;
        this.resultWithTezSSL = resultWithTezSSL;
        this.resultWithoutTezSSL = resultWithoutTezSSL;
        this.asyncHttp = asyncHttp;
    }

    @Parameterized.Parameters(name="test[sslInCluster:{0}, resultWithTezSSL:{1}, resultWithoutTezSSL:{2}, asyncHttp:{3}]")
    public static Collection<Object[]> getParameters() {
        ArrayList<Object[]> parameters = new ArrayList<Object[]>();
        parameters.add(new Object[]{true, 0, 1, false});
        parameters.add(new Object[]{true, 0, 1, true});
        parameters.add(new Object[]{false, 1, 0, true});
        parameters.add(new Object[]{false, 1, 0, false});
        return parameters;
    }

    @BeforeClass
    public static void setupDFSCluster() throws Exception {
        conf = new Configuration();
        conf.setBoolean("dfs.namenode.edits.noeditlogchannelflush", false);
        conf.setBoolean("fs.hdfs.impl.disable.cache", true);
        EditLogFileOutputStream.setShouldSkipFsyncForTesting((boolean)true);
        conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
        miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
        fs = miniDFSCluster.getFileSystem();
        conf.set("fs.defaultFS", fs.getUri().toString());
        conf.setBoolean("tez.runtime.optimize.local.fetch", false);
    }

    @AfterClass
    public static void shutdownDFSCluster() {
        if (miniDFSCluster != null) {
            miniDFSCluster.shutdown();
        }
    }

    @Before
    public void setupTezCluster() throws Exception {
        if (this.enableSSLInCluster) {
            System.setProperty("javax.net.debug", "all");
            TestSecureShuffle.setupKeyStores();
        }
        conf.setBoolean("mapreduce.shuffle.ssl.enabled", this.enableSSLInCluster);
        conf.setInt("tez.runtime.shuffle.connect.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.read.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.fetch.failures.limit", 2);
        conf.setInt("tez.am.task.max.failed.attempts", 1);
        conf.setLong("tez.am.sleep.time.before.exit.millis", 500L);
        conf.setBoolean("tez.runtime.shuffle.use.async.http", this.asyncHttp);
        String sslConf = conf.get("hadoop.ssl.client.conf", "ssl-client.xml");
        conf.addResource(sslConf);
        miniTezCluster = new MiniTezCluster(TestSecureShuffle.class.getName() + "-" + (this.enableSSLInCluster ? "withssl" : "withoutssl"), 1, 1, 1);
        miniTezCluster.init(conf);
        miniTezCluster.start();
        TestSecureShuffle.createSampleFile(inputLoc);
    }

    @After
    public void shutdownTezCluster() throws IOException {
        if (miniTezCluster != null) {
            miniTezCluster.stop();
        }
    }

    private void baseTest(int expectedResult) throws Exception {
        Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
        TestOrderedWordCount wordCount = new TestOrderedWordCount();
        wordCount.setConf(new Configuration(miniTezCluster.getConfig()));
        String[] args = new String[]{"-DUSE_MR_CONFIGS=false", inputLoc.toString(), outputLoc.toString()};
        Assert.assertEquals((long)expectedResult, (long)wordCount.run(args));
    }

    @Test(timeout=500000L)
    public void testSecureShuffle() throws Exception {
        miniTezCluster.getConfig().setBoolean("tez.runtime.shuffle.ssl.enable", true);
        this.baseTest(this.resultWithTezSSL);
        miniTezCluster.getConfig().setBoolean("tez.runtime.shuffle.ssl.enable", false);
        this.baseTest(this.resultWithoutTezSSL);
    }

    private static void createSampleFile(Path inputLoc) throws IOException {
        fs.deleteOnExit(inputLoc);
        FSDataOutputStream out = fs.create(inputLoc);
        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter((OutputStream)out));
        for (int i = 0; i < 10; ++i) {
            writer.write("Hello World");
            writer.write("Some other line");
            writer.newLine();
        }
        writer.close();
    }

    private static void setupKeyStores() throws Exception {
        keysStoresDir.mkdirs();
        String sslConfsDir = KeyStoreTestUtil.getClasspathDir(TestSecureShuffle.class);
        TestSecureShuffle.setupSSLConfig(keysStoresDir.getAbsolutePath(), sslConfsDir, conf, true, true, "");
    }

    public static void setupSSLConfig(String keystoresDir, String sslConfDir, Configuration config, boolean useClientCert, boolean trustStore, String excludeCiphers) throws Exception {
        String clientKS = keystoresDir + "/clientKS.jks";
        String clientPassword = "clientP";
        String serverKS = keystoresDir + "/serverKS.jks";
        String serverPassword = "serverP";
        String trustKS = null;
        String trustPassword = "trustP";
        File sslClientConfFile = new File(sslConfDir, KeyStoreTestUtil.getClientSSLConfigFileName());
        File sslServerConfFile = new File(sslConfDir, KeyStoreTestUtil.getServerSSLConfigFileName());
        HashMap<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
        if (useClientCert) {
            KeyPair cKP = KeyStoreTestUtil.generateKeyPair((String)"RSA");
            X509Certificate cCert = TestSecureShuffle.generateCertificate("CN=localhost, O=client", cKP, 30, "SHA1withRSA");
            KeyStoreTestUtil.createKeyStore((String)clientKS, (String)clientPassword, (String)"client", (Key)cKP.getPrivate(), (Certificate)cCert);
            certs.put("client", cCert);
        }
        String localhostName = InetAddress.getLocalHost().getHostName();
        KeyPair sKP = KeyStoreTestUtil.generateKeyPair((String)"RSA");
        X509Certificate sCert = TestSecureShuffle.generateCertificate("CN=" + localhostName + ", O=server", sKP, 30, "SHA1withRSA");
        KeyStoreTestUtil.createKeyStore((String)serverKS, (String)serverPassword, (String)"server", (Key)sKP.getPrivate(), (Certificate)sCert);
        certs.put("server", sCert);
        if (trustStore) {
            trustKS = keystoresDir + "/trustKS.jks";
            KeyStoreTestUtil.createTrustStore((String)trustKS, (String)trustPassword, certs);
        }
        Configuration clientSSLConf = KeyStoreTestUtil.createClientSSLConfig((String)clientKS, (String)clientPassword, (String)clientPassword, (String)trustKS, (String)excludeCiphers);
        Configuration serverSSLConf = KeyStoreTestUtil.createServerSSLConfig((String)serverKS, (String)serverPassword, (String)serverPassword, (String)trustKS, (String)excludeCiphers);
        KeyStoreTestUtil.saveConfig((File)sslClientConfFile, (Configuration)clientSSLConf);
        KeyStoreTestUtil.saveConfig((File)sslServerConfFile, (Configuration)serverSSLConf);
        config.set("hadoop.ssl.hostname.verifier", "ALLOW_ALL");
        config.set("hadoop.ssl.client.conf", sslClientConfFile.getName());
        config.set("hadoop.ssl.server.conf", sslServerConfFile.getName());
        config.setBoolean("hadoop.ssl.require.client.cert", useClientCert);
    }

    public static X509Certificate generateCertificate(String dn, KeyPair pair, int days, String algorithm) throws Exception {
        Date from = new Date();
        Date to = new Date(from.getTime() + (long)days * 86400000L);
        BigInteger sn = new BigInteger(64, new SecureRandom());
        KeyPair keyPair = pair;
        X509V3CertificateGenerator certGen = new X509V3CertificateGenerator();
        String hostName = InetAddress.getLocalHost().getHostName();
        String hostAddress = InetAddress.getLocalHost().getHostAddress();
        certGen.addExtension(X509Extensions.SubjectAlternativeName, false, (ASN1Encodable)new GeneralNames(new GeneralName[]{new GeneralName(7, hostAddress), new GeneralName(2, hostName), new GeneralName(2, "localhost")}));
        X500Principal dnName = new X500Principal(dn);
        certGen.setSerialNumber(sn);
        certGen.setIssuerDN(dnName);
        certGen.setNotBefore(from);
        certGen.setNotAfter(to);
        certGen.setSubjectDN(dnName);
        certGen.setPublicKey(keyPair.getPublic());
        certGen.setSignatureAlgorithm(algorithm);
        X509Certificate cert = certGen.generate(pair.getPrivate());
        return cert;
    }

    static {
        conf = new Configuration();
        inputLoc = new Path("/tmp/sample.txt");
        TEST_ROOT_DIR = "target/" + TestSecureShuffle.class.getName() + "-tmpDir";
        keysStoresDir = new File(TEST_ROOT_DIR, "keystores");
    }
}

