/*
 * Decompiled with CFR 0.152.
 */
package parquet.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.OutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
import org.junit.Assert;
import org.junit.Test;
import parquet.cascading.ParquetTBaseScheme;
import parquet.hadoop.thrift.ThriftToParquetFileWriter;
import parquet.hadoop.util.ContextUtil;
import parquet.thrift.test.Name;

public class TestParquetTBaseScheme {
    final String txtInputPath = "src/test/resources/names.txt";
    final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in";
    final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out";
    final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out";

    @Test
    public void testWrite() throws Exception {
        Path path = new Path("target/test/ParquetTBaseScheme/names-parquet-out");
        FileSystem fs = path.getFileSystem(new Configuration());
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        TextLine sourceScheme = new TextLine(new Fields(new Comparable[]{"first", "last"}));
        Hfs source = new Hfs((Scheme)sourceScheme, "src/test/resources/names.txt");
        ParquetTBaseScheme sinkScheme = new ParquetTBaseScheme(Name.class);
        Hfs sink = new Hfs((Scheme)sinkScheme, "target/test/ParquetTBaseScheme/names-parquet-out");
        Pipe assembly = new Pipe("namecp");
        assembly = new Each(assembly, (Function)new PackThriftFunction());
        Flow flow = new HadoopFlowConnector().connect("namecp", (Tap)source, (Tap)sink, assembly);
        flow.complete();
    }

    @Test
    public void testRead() throws Exception {
        this.doRead((Scheme)new ParquetTBaseScheme(Name.class));
    }

    @Test
    public void testReadWithoutClass() throws Exception {
        this.doRead((Scheme)new ParquetTBaseScheme());
    }

    private void doRead(Scheme sourceScheme) throws Exception {
        this.createFileForRead();
        Path path = new Path("target/test/ParquetTBaseScheme/names-txt-out");
        FileSystem fs = path.getFileSystem(new Configuration());
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        Hfs source = new Hfs(sourceScheme, "target/test/ParquetTBaseScheme/names-parquet-in");
        TextLine sinkScheme = new TextLine(new Fields(new Comparable[]{"first", "last"}));
        Hfs sink = new Hfs((Scheme)sinkScheme, "target/test/ParquetTBaseScheme/names-txt-out");
        Pipe assembly = new Pipe("namecp");
        assembly = new Each(assembly, (Function)new UnpackThriftFunction());
        Flow flow = new HadoopFlowConnector().connect("namecp", (Tap)source, (Tap)sink, assembly);
        flow.complete();
        String result = FileUtils.readFileToString((File)new File("target/test/ParquetTBaseScheme/names-txt-out/part-00000"));
        Assert.assertEquals((Object)"Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", (Object)result);
    }

    private void createFileForRead() throws Exception {
        Path fileToCreate = new Path("target/test/ParquetTBaseScheme/names-parquet-in/names.parquet");
        Configuration conf = new Configuration();
        FileSystem fs = fileToCreate.getFileSystem(conf);
        if (fs.exists(fileToCreate)) {
            fs.delete(fileToCreate, true);
        }
        TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
        TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
        ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext((Configuration)conf, (TaskAttemptID)taskId), (TProtocolFactory)protocolFactory, Name.class);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        TProtocol protocol = protocolFactory.getProtocol((TTransport)new TIOStreamTransport((OutputStream)baos));
        Name n1 = new Name();
        n1.setFirst_name("Alice");
        n1.setLast_name("Practice");
        Name n2 = new Name();
        n2.setFirst_name("Bob");
        n2.setLast_name("Hope");
        Name n3 = new Name();
        n3.setFirst_name("Charlie");
        n3.setLast_name("Horse");
        n1.write(protocol);
        w.write(new BytesWritable(baos.toByteArray()));
        baos.reset();
        n2.write(protocol);
        w.write(new BytesWritable(baos.toByteArray()));
        baos.reset();
        n3.write(protocol);
        w.write(new BytesWritable(baos.toByteArray()));
        w.close();
    }

    private static class UnpackThriftFunction
    extends BaseOperation
    implements Function {
        private UnpackThriftFunction() {
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            Tuple result = new Tuple();
            Name name = (Name)((Object)arguments.get(0));
            result.add((Comparable)((Object)name.getFirst_name()));
            result.add((Comparable)((Object)name.getLast_name()));
            functionCall.getOutputCollector().add(result);
        }
    }

    private static class PackThriftFunction
    extends BaseOperation
    implements Function {
        private PackThriftFunction() {
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            Tuple result = new Tuple();
            Name name = new Name();
            name.setFirst_name(arguments.getString((Comparable)Integer.valueOf(0)));
            name.setLast_name(arguments.getString((Comparable)Integer.valueOf(1)));
            result.add((Comparable)((Object)name));
            functionCall.getOutputCollector().add(result);
        }
    }
}

