/*
 * Decompiled with CFR 0.152.
 */
package parquet.scrooge;

import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
import org.junit.Assert;
import org.junit.Test;
import parquet.cascading.ParquetValueScheme;
import parquet.hadoop.thrift.ThriftToParquetFileWriter;
import parquet.hadoop.util.ContextUtil;
import parquet.scrooge.ParquetScroogeScheme;
import parquet.scrooge.test.Name;
import parquet.scrooge.test.Name$;
import parquet.scrooge.test.RequiredPrimitiveFixture;
import parquet.scrooge.test.TestPersonWithAllInformation;
import parquet.thrift.test.Address;
import parquet.thrift.test.Phone;
import scala.Option;

public class ParquetScroogeSchemeTest {
    public static final String PARQUET_PATH = "target/test/TestParquetToThriftReadProjection/file.parquet";
    public static final String TXT_OUTPUT_PATH = "target/test/TestParquetToThriftReadProjection/output.txt";
    final String txtInputPath = "src/test/resources/names.txt";
    final String parquetOutputPath = "target/test/ParquetScroogeScheme/names-parquet-out";
    final String txtOutputPath = "target/test/ParquetScroogeScheme/names-txt-out";

    @Test
    public void testWritePrimitveThriftReadScrooge() throws Exception {
        parquet.thrift.test.RequiredPrimitiveFixture toWrite = new parquet.thrift.test.RequiredPrimitiveFixture(true, 2, 3, 4, 5L, 6.0, "7");
        toWrite.setInfo_string("it's info");
        this.verifyScroogeRead(this.thriftRecords(toWrite), RequiredPrimitiveFixture.class, "RequiredPrimitiveFixture(true,2,3,4,5,6.0,7,Some(it's info))\n", "**");
    }

    @Test
    public void testNestedReadingInScrooge() throws Exception {
        HashMap<String, Phone> phoneMap = new HashMap<String, Phone>();
        phoneMap.put("key1", new Phone("111", "222"));
        parquet.thrift.test.TestPersonWithAllInformation toWrite = new parquet.thrift.test.TestPersonWithAllInformation(new parquet.thrift.test.Name("first"), new Address("my_street", "my_zip"), phoneMap);
        toWrite.setInfo("my_info");
        String expected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(key1 -> Phone(111,222)),None,None)\n";
        this.verifyScroogeRead(this.thriftRecords(toWrite), TestPersonWithAllInformation.class, expected, "**");
        String expectedProjected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(),None,None)\n";
        this.verifyScroogeRead(this.thriftRecords(toWrite), TestPersonWithAllInformation.class, expectedProjected, "address/*;info;name/first_name");
    }

    public <T> void verifyScroogeRead(List<TBase> recordsToWrite, Class<T> readClass, String expectedStr, String projectionFilter) throws Exception {
        Configuration conf = new Configuration();
        this.deleteIfExist(PARQUET_PATH);
        this.deleteIfExist(TXT_OUTPUT_PATH);
        Path parquetFile = new Path(PARQUET_PATH);
        this.writeParquetFile(recordsToWrite, conf, parquetFile);
        ParquetScroogeScheme sourceScheme = new ParquetScroogeScheme(new ParquetValueScheme.Config().withRecordClass(readClass).withProjectionString(projectionFilter));
        Hfs source = new Hfs((Scheme)sourceScheme, PARQUET_PATH);
        TextLine sinkScheme = new TextLine(new Fields(new Comparable[]{"first", "last"}));
        Hfs sink = new Hfs((Scheme)sinkScheme, TXT_OUTPUT_PATH);
        Pipe assembly = new Pipe("namecp");
        assembly = new Each(assembly, (Function)new ObjectToStringFunction());
        Flow flow = new HadoopFlowConnector().connect("namecp", (Tap)source, (Tap)sink, assembly);
        flow.complete();
        String result = FileUtils.readFileToString((File)new File("target/test/TestParquetToThriftReadProjection/output.txt/part-00000"));
        Assert.assertEquals((Object)expectedStr, (Object)result);
    }

    private void writeParquetFile(List<TBase> recordsToWrite, Configuration conf, Path parquetFile) throws IOException, InterruptedException, TException {
        TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
        TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
        Class<?> writeClass = recordsToWrite.get(0).getClass();
        ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, ContextUtil.newTaskAttemptContext((Configuration)conf, (TaskAttemptID)taskId), (TProtocolFactory)protocolFactory, writeClass);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        TProtocol protocol = protocolFactory.getProtocol((TTransport)new TIOStreamTransport((OutputStream)baos));
        for (TBase recordToWrite : recordsToWrite) {
            recordToWrite.write(protocol);
        }
        w.write(new BytesWritable(baos.toByteArray()));
        w.close();
    }

    private List<TBase> thriftRecords(TBase ... records) {
        ArrayList<TBase> result = new ArrayList<TBase>();
        for (TBase record : records) {
            result.add(record);
        }
        return result;
    }

    private void deleteIfExist(String path) throws IOException {
        Path p = new Path(path);
        Configuration conf = new Configuration();
        FileSystem fs = p.getFileSystem(conf);
        if (fs.exists(p)) {
            fs.delete(p, true);
        }
    }

    @Test
    public void testWriteThenRead() throws Exception {
        this.doWrite();
        this.doRead();
    }

    private void doWrite() throws Exception {
        Path path = new Path("target/test/ParquetScroogeScheme/names-parquet-out");
        FileSystem fs = path.getFileSystem(new Configuration());
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        TextLine sourceScheme = new TextLine(new Fields(new Comparable[]{"first", "last"}));
        Hfs source = new Hfs((Scheme)sourceScheme, "src/test/resources/names.txt");
        ParquetScroogeScheme sinkScheme = new ParquetScroogeScheme(Name.class);
        Hfs sink = new Hfs((Scheme)sinkScheme, "target/test/ParquetScroogeScheme/names-parquet-out");
        Pipe assembly = new Pipe("namecp");
        assembly = new Each(assembly, (Function)new PackThriftFunction());
        Flow flow = new HadoopFlowConnector().connect("namecp", (Tap)source, (Tap)sink, assembly);
        flow.complete();
    }

    private void doRead() throws Exception {
        Path path = new Path("target/test/ParquetScroogeScheme/names-txt-out");
        FileSystem fs = path.getFileSystem(new Configuration());
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        ParquetScroogeScheme sourceScheme = new ParquetScroogeScheme(Name.class);
        Hfs source = new Hfs((Scheme)sourceScheme, "target/test/ParquetScroogeScheme/names-parquet-out");
        TextLine sinkScheme = new TextLine(new Fields(new Comparable[]{"first", "last"}));
        Hfs sink = new Hfs((Scheme)sinkScheme, "target/test/ParquetScroogeScheme/names-txt-out");
        Pipe assembly = new Pipe("namecp");
        assembly = new Each(assembly, (Function)new UnpackThriftFunction());
        Flow flow = new HadoopFlowConnector().connect("namecp", (Tap)source, (Tap)sink, assembly);
        flow.complete();
        String result = FileUtils.readFileToString((File)new File("target/test/ParquetScroogeScheme/names-txt-out/part-00000"));
        Assert.assertEquals((Object)"0\tAlice\tPractice\n15\tBob\tHope\n24\tCharlie\tHorse\n", (Object)result);
    }

    private static class UnpackThriftFunction
    extends BaseOperation
    implements Function {
        private UnpackThriftFunction() {
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            Tuple result = new Tuple();
            Name name = (Name)arguments.getObject(0);
            result.add((Comparable)((Object)name.firstName()));
            result.add((Comparable)name.lastName().get());
            functionCall.getOutputCollector().add(result);
        }
    }

    private static class PackThriftFunction
    extends BaseOperation
    implements Function {
        private PackThriftFunction() {
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            Tuple result = new Tuple();
            Name name = Name$.MODULE$.apply(arguments.getString((Comparable)Integer.valueOf(0)), (Option<String>)Option.apply((Object)arguments.getString((Comparable)Integer.valueOf(1))));
            result.add((Object)name);
            functionCall.getOutputCollector().add(result);
        }
    }

    private static class ObjectToStringFunction
    extends BaseOperation
    implements Function {
        private ObjectToStringFunction() {
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            Object record = functionCall.getArguments().getObject(0);
            Tuple result = new Tuple();
            result.add((Comparable)((Object)record.toString()));
            functionCall.getOutputCollector().add(result);
        }
    }
}

