/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.hcatalog.streaming;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.RecordWriter;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestStreaming {
    private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
    private static final String COL1 = "id";
    private static final String COL2 = "msg";
    private final HiveConf conf;
    private final IMetaStoreClient msClient;
    final String metaStoreURI;
    private static final String dbName = "testing";
    private static final String tblName = "alerts";
    private static final String[] fieldNames = new String[]{"id", "msg"};
    List<String> partitionVals;
    private static String partLocation;
    private static final String dbName2 = "testing";
    private static final String tblName2 = "alerts";
    private static final String[] fieldNames2;
    private final String PART1_CONTINENT = "Asia";
    private final String PART1_COUNTRY = "India";
    @Rule
    public TemporaryFolder dbFolder = new TemporaryFolder();

    public TestStreaming() throws Exception {
        this.metaStoreURI = null;
        this.partitionVals = new ArrayList<String>(2);
        this.partitionVals.add("Asia");
        this.partitionVals.add("India");
        this.conf = new HiveConf(this.getClass());
        this.conf.set("fs.raw.impl", RawFileSystem.class.getName());
        this.conf.set("fs.default.name", "file:///");
        TxnDbUtil.setConfValues((HiveConf)this.conf);
        if (this.metaStoreURI != null) {
            this.conf.setVar(HiveConf.ConfVars.METASTOREURIS, this.metaStoreURI);
        }
        this.conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
        this.conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        TxnDbUtil.cleanDb();
        TxnDbUtil.prepDb();
        this.msClient = new HiveMetaStoreClient(this.conf);
    }

    @Before
    public void setup() throws Exception {
        TestStreaming.dropDB(this.msClient, "testing");
        this.createDbAndTable(this.msClient, "testing", "alerts", this.partitionVals);
        TestStreaming.dropDB(this.msClient, "testing");
        this.createDbAndTable(this.msClient, "testing", "alerts", this.partitionVals);
    }

    @After
    public void cleanup() throws Exception {
        this.msClient.close();
    }

    private static List<FieldSchema> getPartitionKeys() {
        ArrayList<FieldSchema> fields = new ArrayList<FieldSchema>();
        fields.add(new FieldSchema("continent", "string", ""));
        fields.add(new FieldSchema("country", "string", ""));
        return fields;
    }

    private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles, String ... records) throws Exception {
        ValidTxnList txns = this.msClient.getValidTxns();
        AcidUtils.Directory dir = AcidUtils.getAcidState((Path)new Path(partLocation), (Configuration)this.conf, (ValidTxnList)txns);
        Assert.assertEquals((int)0, (int)dir.getObsolete().size());
        Assert.assertEquals((int)0, (int)dir.getOriginalFiles().size());
        List current = dir.getCurrentDirectories();
        System.out.println("Files found: ");
        for (AcidUtils.ParsedDelta pd : current) {
            System.out.println(pd.getPath().toString());
        }
        Assert.assertEquals((int)numExpectedFiles, (int)current.size());
        long min = Long.MAX_VALUE;
        long max = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta pd : current) {
            if (pd.getMaxTransaction() > max) {
                max = pd.getMaxTransaction();
            }
            if (pd.getMinTransaction() >= min) continue;
            min = pd.getMinTransaction();
        }
        Assert.assertEquals((long)minTxn, (long)min);
        Assert.assertEquals((long)maxTxn, (long)max);
        OrcInputFormat inf = new OrcInputFormat();
        JobConf job = new JobConf();
        job.set("mapred.input.dir", partLocation.toString());
        job.set("bucket_count", Integer.toString(buckets));
        job.set("hive.txn.valid.txns", txns.toString());
        InputSplit[] splits = inf.getSplits(job, 1);
        Assert.assertEquals((int)1, (int)splits.length);
        RecordReader rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
        NullWritable key = (NullWritable)rr.createKey();
        OrcStruct value = (OrcStruct)rr.createValue();
        for (int i = 0; i < records.length; ++i) {
            Assert.assertEquals((boolean)true, (boolean)rr.next((Object)key, (Object)value));
            Assert.assertEquals((String)records[i], (String)value.toString());
        }
        Assert.assertEquals((boolean)false, (boolean)rr.next((Object)key, (Object)value));
    }

    private void checkNothingWritten() throws Exception {
        ValidTxnList txns = this.msClient.getValidTxns();
        AcidUtils.Directory dir = AcidUtils.getAcidState((Path)new Path(partLocation), (Configuration)this.conf, (ValidTxnList)txns);
        Assert.assertEquals((int)0, (int)dir.getObsolete().size());
        Assert.assertEquals((int)0, (int)dir.getOriginalFiles().size());
        List current = dir.getCurrentDirectories();
        Assert.assertEquals((int)0, (int)current.size());
    }

    @Test
    public void testEndpointConnection() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(false, null);
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", null);
        endPt.newConnection(false, null).close();
    }

    @Test
    public void testAddPartition() throws Exception {
        ArrayList<String> newPartVals = new ArrayList<String>(2);
        newPartVals.add("Asia");
        newPartVals.add("Nepal");
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", newPartVals);
        try {
            this.msClient.getPartition(endPt.database, endPt.table, (List)endPt.partitionVals);
            Assert.assertTrue((String)"Partition already exists", (boolean)false);
        }
        catch (NoSuchObjectException e) {
            // empty catch block
        }
        Assert.assertNotNull((Object)endPt.newConnection(true, this.conf));
        Partition p = this.msClient.getPartition(endPt.database, endPt.table, (List)endPt.partitionVals);
        Assert.assertNotNull((String)"Did not find added partition", (Object)p);
    }

    @Test
    public void testTransactionBatchEmptyCommit() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(false, null);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.commit();
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", null);
        writer = new DelimitedInputWriter(fieldNames2, ",", endPt);
        connection = endPt.newConnection(false, null);
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.commit();
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testTransactionBatchEmptyAbort() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(true, this.conf);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.abort();
        Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", null);
        writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        connection = endPt.newConnection(true);
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.abort();
        Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testTransactionBatchCommit_Delimited() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(true, this.conf);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.commit();
        this.checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("2,Welcome to streaming".getBytes());
        this.checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}");
        txnBatch.commit();
        this.checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", null);
        writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        connection = endPt.newConnection(true);
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.commit();
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testTransactionBatchCommit_Json() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        StrictJsonWriter writer = new StrictJsonWriter(endPt);
        StreamingConnection connection = endPt.newConnection(true, this.conf);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
        txnBatch.write(rec1.getBytes());
        txnBatch.commit();
        this.checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testRemainingTransactions() throws Exception {
        int rec;
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(true, this.conf);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        int batch = 0;
        int initialCount = txnBatch.remainingTransactions();
        while (txnBatch.remainingTransactions() > 0) {
            txnBatch.beginNextTransaction();
            Assert.assertEquals((int)(--initialCount), (int)txnBatch.remainingTransactions());
            for (rec = 0; rec < 2; ++rec) {
                Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
                txnBatch.write((batch * rec + ",Hello streaming").getBytes());
            }
            txnBatch.commit();
            Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
            ++batch;
        }
        Assert.assertEquals((int)0, (int)txnBatch.remainingTransactions());
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        batch = 0;
        initialCount = txnBatch.remainingTransactions();
        while (txnBatch.remainingTransactions() > 0) {
            txnBatch.beginNextTransaction();
            Assert.assertEquals((int)(--initialCount), (int)txnBatch.remainingTransactions());
            for (rec = 0; rec < 2; ++rec) {
                Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
                txnBatch.write((batch * rec + ",Hello streaming").getBytes());
            }
            txnBatch.abort();
            Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
            ++batch;
        }
        Assert.assertEquals((int)0, (int)txnBatch.remainingTransactions());
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testTransactionBatchAbort() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(false);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.write("2,Welcome to streaming".getBytes());
        txnBatch.abort();
        this.checkNothingWritten();
        Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
        this.checkNothingWritten();
    }

    @Test
    public void testTransactionBatchAbortAndCommit() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(false);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.write("2,Welcome to streaming".getBytes());
        txnBatch.abort();
        this.checkNothingWritten();
        Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.beginNextTransaction();
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.write("2,Welcome to streaming".getBytes());
        txnBatch.commit();
        this.checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testMultipleTransactionBatchCommits() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(true, this.conf);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.commit();
        this.checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}");
        txnBatch.beginNextTransaction();
        txnBatch.write("2,Welcome to streaming".getBytes());
        txnBatch.commit();
        this.checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        txnBatch.close();
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("3,Hello streaming - once again".getBytes());
        txnBatch.commit();
        this.checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
        txnBatch.beginNextTransaction();
        txnBatch.write("4,Welcome to streaming - once again".getBytes());
        txnBatch.commit();
        this.checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testInterleavedTransactionBatchCommits() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(false);
        TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch1.beginNextTransaction();
        DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt);
        TransactionBatch txnBatch2 = connection.fetchTransactionBatch(10, (RecordWriter)writer2);
        txnBatch2.beginNextTransaction();
        txnBatch1.write("1,Hello streaming".getBytes());
        txnBatch2.write("3,Hello streaming - once again".getBytes());
        this.checkNothingWritten();
        txnBatch2.commit();
        this.checkDataWritten(11L, 20L, 1, 1, "{3, Hello streaming - once again}");
        txnBatch1.commit();
        this.checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
        txnBatch1.beginNextTransaction();
        txnBatch1.write("2,Welcome to streaming".getBytes());
        txnBatch2.beginNextTransaction();
        txnBatch2.write("4,Welcome to streaming - once again".getBytes());
        this.checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
        txnBatch1.commit();
        this.checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
        txnBatch2.commit();
        this.checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch1.getCurrentTransactionState());
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch2.getCurrentTransactionState());
        txnBatch1.close();
        txnBatch2.close();
        connection.close();
    }

    @Test
    public void testConcurrentTransactionBatchCommits() throws Exception {
        HiveEndPoint ep = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        ArrayList<WriterThd> writers = new ArrayList<WriterThd>(3);
        writers.add(new WriterThd(ep, "1,Matrix"));
        writers.add(new WriterThd(ep, "2,Gandhi"));
        writers.add(new WriterThd(ep, "3,Silence"));
        for (WriterThd w : writers) {
            w.start();
        }
        for (WriterThd w : writers) {
            w.join();
        }
        for (WriterThd w : writers) {
            if (w.error == null) continue;
            Assert.assertFalse((String)("Writer thread" + w.getName() + " died: " + w.error.getMessage() + " See log file for stack trace"), (boolean)true);
        }
    }

    public static void dropDB(IMetaStoreClient client, String databaseName) {
        try {
            for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
                client.dropTable(databaseName, table, true, true);
            }
            client.dropDatabase(databaseName);
        }
        catch (TException tException) {
            // empty catch block
        }
    }

    public void createDbAndTable(IMetaStoreClient client, String databaseName, String tableName, List<String> partVals) throws Exception {
        Database db = new Database();
        db.setName(databaseName);
        String dbLocation = "raw://" + this.dbFolder.newFolder(databaseName + ".db").toURI().getPath();
        db.setLocationUri(dbLocation);
        client.createDatabase(db);
        Table tbl = new Table();
        tbl.setDbName(databaseName);
        tbl.setTableName(tableName);
        tbl.setTableType(TableType.MANAGED_TABLE.toString());
        StorageDescriptor sd = new StorageDescriptor();
        sd.setCols(TestStreaming.getTableColumns());
        sd.setNumBuckets(1);
        sd.setLocation(dbLocation + "/" + tableName);
        tbl.setPartitionKeys(TestStreaming.getPartitionKeys());
        tbl.setSd(sd);
        sd.setBucketCols(new ArrayList(2));
        sd.setSerdeInfo(new SerDeInfo());
        sd.getSerdeInfo().setName(tbl.getTableName());
        sd.getSerdeInfo().setParameters(new HashMap());
        sd.getSerdeInfo().getParameters().put("serialization.format", "1");
        sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
        sd.setInputFormat(HiveInputFormat.class.getName());
        sd.setOutputFormat(OrcOutputFormat.class.getName());
        HashMap tableParams = new HashMap();
        tbl.setParameters(tableParams);
        client.createTable(tbl);
        try {
            TestStreaming.addPartition(client, tbl, partVals);
        }
        catch (AlreadyExistsException e) {
            // empty catch block
        }
        Partition createdPartition = client.getPartition(databaseName, tableName, partVals);
        partLocation = createdPartition.getSd().getLocation();
    }

    private static void addPartition(IMetaStoreClient client, Table tbl, List<String> partValues) throws IOException, TException {
        Partition part = new Partition();
        part.setDbName(tbl.getDbName());
        part.setTableName("alerts");
        StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
        sd.setLocation(sd.getLocation() + "/" + TestStreaming.makePartPath(tbl.getPartitionKeys(), partValues));
        part.setSd(sd);
        part.setValues(partValues);
        client.add_partition(part);
    }

    private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) {
        if (partKeys.size() != partVals.size()) {
            throw new IllegalArgumentException("Partition values:" + partVals + ", does not match the partition Keys in table :" + partKeys);
        }
        StringBuffer buff = new StringBuffer(partKeys.size() * 20);
        buff.append(" ( ");
        int i = 0;
        for (FieldSchema schema : partKeys) {
            buff.append(schema.getName());
            buff.append("='");
            buff.append(partVals.get(i));
            buff.append("'");
            if (i != partKeys.size() - 1) {
                buff.append("/");
            }
            ++i;
        }
        buff.append(" )");
        return buff.toString();
    }

    private static List<FieldSchema> getTableColumns() {
        ArrayList<FieldSchema> fields = new ArrayList<FieldSchema>();
        fields.add(new FieldSchema(COL1, "int", ""));
        fields.add(new FieldSchema(COL2, "string", ""));
        return fields;
    }

    static {
        fieldNames2 = new String[]{COL1, COL2};
    }

    private static class WriterThd
    extends Thread {
        private final StreamingConnection conn;
        private final DelimitedInputWriter writer;
        private final String data;
        private Throwable error;

        WriterThd(HiveEndPoint ep, String data) throws Exception {
            super("Writer_" + data);
            this.writer = new DelimitedInputWriter(fieldNames, ",", ep);
            this.conn = ep.newConnection(false);
            this.data = data;
            this.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                @Override
                public void uncaughtException(Thread thread, Throwable throwable) {
                    WriterThd.this.error = throwable;
                    LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
                }
            });
        }

        @Override
        public void run() {
            TransactionBatch txnBatch = null;
            try {
                txnBatch = this.conn.fetchTransactionBatch(10, (RecordWriter)this.writer);
                while (txnBatch.remainingTransactions() > 0) {
                    txnBatch.beginNextTransaction();
                    txnBatch.write(this.data.getBytes());
                    txnBatch.write(this.data.getBytes());
                    txnBatch.commit();
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            finally {
                if (txnBatch != null) {
                    try {
                        txnBatch.close();
                    }
                    catch (Exception e) {
                        LOG.error("txnBatch.close() failed: " + e.getMessage(), (Throwable)e);
                        this.conn.close();
                    }
                }
                try {
                    this.conn.close();
                }
                catch (Exception e) {
                    LOG.error("conn.close() failed: " + e.getMessage(), (Throwable)e);
                }
            }
        }
    }

    public static class RawFileSystem
    extends RawLocalFileSystem {
        private static final URI NAME;

        public URI getUri() {
            return NAME;
        }

        public FileStatus getFileStatus(Path path) throws IOException {
            File file = this.pathToFile(path);
            if (!file.exists()) {
                throw new FileNotFoundException("Can't find " + path);
            }
            short mod = 0;
            if (file.canRead()) {
                mod = (short)(mod | 0x124);
            }
            if (file.canWrite()) {
                mod = (short)(mod | 0x80);
            }
            if (file.canExecute()) {
                mod = (short)(mod | 0x49);
            }
            return new FileStatus(file.length(), file.isDirectory(), 1, 1024L, file.lastModified(), file.lastModified(), FsPermission.createImmutable((short)mod), "owen", "users", path);
        }

        static {
            try {
                NAME = new URI("raw:///");
            }
            catch (URISyntaxException se) {
                throw new IllegalArgumentException("bad uri", se);
            }
        }
    }
}

