package org.apache.hive.hcatalog.streaming;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.thrift.TException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/hive/hcatalog/streaming/TestStreaming.class */
public class TestStreaming {
    private final HiveConf conf;
    private final IMetaStoreClient msClient;
    private static final String dbName = "testing";
    private static final String tblName = "alerts";
    private static String partLocation;
    private static final String dbName2 = "testing";
    private static final String tblName2 = "alerts";
    private static final String COL1 = "id";
    private static final String COL2 = "msg";
    private static final String[] fieldNames = {COL1, COL2};
    private static final String[] fieldNames2 = {COL1, COL2};
    final String metaStoreURI = null;
    private final String PART1_CONTINENT = "Asia";
    private final String PART1_COUNTRY = "India";

    @Rule
    public TemporaryFolder dbFolder = new TemporaryFolder();
    List<String> partitionVals = new ArrayList(2);

    /* loaded from: input_file:org/apache/hive/hcatalog/streaming/TestStreaming$RawFileSystem.class */
    public static class RawFileSystem extends RawLocalFileSystem {
        private static final URI NAME;

        public URI getUri() {
            return NAME;
        }

        public FileStatus getFileStatus(Path path) throws IOException {
            File pathToFile = pathToFile(path);
            if (!pathToFile.exists()) {
                throw new FileNotFoundException("Can't find " + path);
            }
            short s = 0;
            if (pathToFile.canRead()) {
                s = (short) (0 | 292);
            }
            if (pathToFile.canWrite()) {
                s = (short) (s | 128);
            }
            if (pathToFile.canExecute()) {
                s = (short) (s | 73);
            }
            return new FileStatus(pathToFile.length(), pathToFile.isDirectory(), 1, 1024L, pathToFile.lastModified(), pathToFile.lastModified(), FsPermission.createImmutable(s), "owen", "users", path);
        }

        static {
            try {
                NAME = new URI("raw:///");
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException("bad uri", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hive/hcatalog/streaming/TestStreaming$WriterThd.class */
    class WriterThd extends Thread {
        private StreamingConnection conn;
        private HiveEndPoint ep;
        private DelimitedInputWriter writer;
        private String data;

        WriterThd(HiveEndPoint hiveEndPoint, String str) throws Exception {
            this.ep = hiveEndPoint;
            this.writer = new DelimitedInputWriter(TestStreaming.fieldNames, ",", hiveEndPoint);
            this.conn = hiveEndPoint.newConnection(false);
            this.data = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            TransactionBatch transactionBatch = null;
            try {
                try {
                    transactionBatch = this.conn.fetchTransactionBatch(1000, this.writer);
                    while (transactionBatch.remainingTransactions() > 0) {
                        transactionBatch.beginNextTransaction();
                        transactionBatch.write(this.data.getBytes());
                        transactionBatch.write(this.data.getBytes());
                        transactionBatch.commit();
                    }
                    if (transactionBatch != null) {
                        try {
                            transactionBatch.close();
                        } catch (Exception e) {
                            this.conn.close();
                            throw new RuntimeException(e);
                        }
                    }
                    try {
                        this.conn.close();
                    } catch (Exception e2) {
                        throw new RuntimeException(e2);
                    }
                } catch (Exception e3) {
                    throw new RuntimeException(e3);
                }
            } catch (Throwable th) {
                if (transactionBatch != null) {
                    try {
                        transactionBatch.close();
                    } catch (Exception e4) {
                        this.conn.close();
                        throw new RuntimeException(e4);
                    }
                }
                try {
                    this.conn.close();
                    throw th;
                } catch (Exception e5) {
                    throw new RuntimeException(e5);
                }
            }
        }
    }

    public TestStreaming() throws Exception {
        this.partitionVals.add("Asia");
        this.partitionVals.add("India");
        this.conf = new HiveConf(getClass());
        this.conf.set("fs.raw.impl", RawFileSystem.class.getName());
        TxnDbUtil.setConfValues(this.conf);
        if (this.metaStoreURI != null) {
            this.conf.setVar(HiveConf.ConfVars.METASTOREURIS, this.metaStoreURI);
        }
        this.conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
        this.conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        TxnDbUtil.cleanDb();
        TxnDbUtil.prepDb();
        this.msClient = new HiveMetaStoreClient(this.conf);
    }

    @Before
    public void setup() throws Exception {
        dropDB(this.msClient, "testing");
        createDbAndTable(this.msClient, "testing", "alerts", this.partitionVals);
        dropDB(this.msClient, "testing");
        createDbAndTable(this.msClient, "testing", "alerts", this.partitionVals);
    }

    private static List<FieldSchema> getPartitionKeys() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FieldSchema("continent", "string", ""));
        arrayList.add(new FieldSchema("country", "string", ""));
        return arrayList;
    }

    private void checkDataWritten(long j, long j2, int i, int i2, String... strArr) throws Exception {
        ValidTxnList validTxns = this.msClient.getValidTxns();
        AcidUtils.Directory acidState = AcidUtils.getAcidState(new Path(partLocation), this.conf, validTxns);
        Assert.assertEquals(0, acidState.getObsolete().size());
        Assert.assertEquals(0, acidState.getOriginalFiles().size());
        List<AcidUtils.ParsedDelta> currentDirectories = acidState.getCurrentDirectories();
        System.out.println("Files found: ");
        Iterator it = currentDirectories.iterator();
        while (it.hasNext()) {
            System.out.println(((AcidUtils.ParsedDelta) it.next()).getPath().toString());
        }
        Assert.assertEquals(i2, currentDirectories.size());
        long j3 = Long.MAX_VALUE;
        long j4 = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta parsedDelta : currentDirectories) {
            if (parsedDelta.getMaxTransaction() > j4) {
                j4 = parsedDelta.getMaxTransaction();
            }
            if (parsedDelta.getMinTransaction() < j3) {
                j3 = parsedDelta.getMinTransaction();
            }
        }
        Assert.assertEquals(j, j3);
        Assert.assertEquals(j2, j4);
        OrcInputFormat orcInputFormat = new OrcInputFormat();
        JobConf jobConf = new JobConf();
        jobConf.set("mapred.input.dir", partLocation.toString());
        jobConf.set("bucket_count", Integer.toString(i));
        jobConf.set("hive.txn.valid.txns", validTxns.toString());
        InputSplit[] splits = orcInputFormat.getSplits(jobConf, 1);
        Assert.assertEquals(1, splits.length);
        RecordReader recordReader = orcInputFormat.getRecordReader(splits[0], jobConf, Reporter.NULL);
        NullWritable nullWritable = (NullWritable) recordReader.createKey();
        OrcStruct orcStruct = (OrcStruct) recordReader.createValue();
        for (String str : strArr) {
            Assert.assertEquals(true, recordReader.next(nullWritable, orcStruct));
            Assert.assertEquals(str, orcStruct.toString());
        }
        Assert.assertEquals(false, recordReader.next(nullWritable, orcStruct));
    }

    private void checkNothingWritten() throws Exception {
        AcidUtils.Directory acidState = AcidUtils.getAcidState(new Path(partLocation), this.conf, this.msClient.getValidTxns());
        Assert.assertEquals(0, acidState.getObsolete().size());
        Assert.assertEquals(0, acidState.getOriginalFiles().size());
        Assert.assertEquals(0, acidState.getCurrentDirectories().size());
    }

    @Test
    public void testEndpointConnection() throws Exception {
        new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals).newConnection(false, (HiveConf) null).close();
        new HiveEndPoint(this.metaStoreURI, "testing", "alerts", (List) null).newConnection(false, (HiveConf) null).close();
    }

    @Test
    public void testAddPartition() throws Exception {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add("Asia");
        arrayList.add("Nepal");
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", arrayList);
        try {
            this.msClient.getPartition(hiveEndPoint.database, hiveEndPoint.table, hiveEndPoint.partitionVals);
            Assert.assertTrue("Partition already exists", false);
        } catch (NoSuchObjectException e) {
        }
        Assert.assertNotNull(hiveEndPoint.newConnection(true, (HiveConf) null));
        Assert.assertNotNull("Did not find added partition", this.msClient.getPartition(hiveEndPoint.database, hiveEndPoint.table, hiveEndPoint.partitionVals));
    }

    @Test
    public void testTransactionBatchEmptyCommit() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, (HiveConf) null);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.commit();
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.close();
        newConnection.close();
        HiveEndPoint hiveEndPoint2 = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", (List) null);
        DelimitedInputWriter delimitedInputWriter2 = new DelimitedInputWriter(fieldNames2, ",", hiveEndPoint2);
        StreamingConnection newConnection2 = hiveEndPoint2.newConnection(false, (HiveConf) null);
        TransactionBatch fetchTransactionBatch2 = newConnection2.fetchTransactionBatch(10, delimitedInputWriter2);
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.commit();
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch2.close();
        newConnection2.close();
    }

    @Test
    public void testTransactionBatchEmptyAbort() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.abort();
        Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.close();
        newConnection.close();
        HiveEndPoint hiveEndPoint2 = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", (List) null);
        DelimitedInputWriter delimitedInputWriter2 = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint2);
        StreamingConnection newConnection2 = hiveEndPoint2.newConnection(true);
        TransactionBatch fetchTransactionBatch2 = newConnection2.fetchTransactionBatch(10, delimitedInputWriter2);
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.abort();
        Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch2.close();
        newConnection2.close();
    }

    @Test
    public void testTransactionBatchCommit_Delimited() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.beginNextTransaction();
        Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}");
        fetchTransactionBatch.commit();
        checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        fetchTransactionBatch.close();
        Assert.assertEquals(TransactionBatch.TxnState.INACTIVE, fetchTransactionBatch.getCurrentTransactionState());
        newConnection.close();
        HiveEndPoint hiveEndPoint2 = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", (List) null);
        DelimitedInputWriter delimitedInputWriter2 = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint2);
        StreamingConnection newConnection2 = hiveEndPoint2.newConnection(true);
        TransactionBatch fetchTransactionBatch2 = newConnection2.fetchTransactionBatch(10, delimitedInputWriter2);
        fetchTransactionBatch2.beginNextTransaction();
        Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch2.write("1,Hello streaming".getBytes());
        fetchTransactionBatch2.commit();
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch2.getCurrentTransactionState());
        newConnection2.close();
    }

    @Test
    public void testTransactionBatchCommit_Json() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        StrictJsonWriter strictJsonWriter = new StrictJsonWriter(hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, strictJsonWriter);
        fetchTransactionBatch.beginNextTransaction();
        Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.write("{\"id\" : 1, \"msg\": \"Hello streaming\"}".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.close();
        Assert.assertEquals(TransactionBatch.TxnState.INACTIVE, fetchTransactionBatch.getCurrentTransactionState());
        newConnection.close();
    }

    @Test
    public void testRemainingTransactions() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        int i = 0;
        int remainingTransactions = fetchTransactionBatch.remainingTransactions();
        while (fetchTransactionBatch.remainingTransactions() > 0) {
            fetchTransactionBatch.beginNextTransaction();
            remainingTransactions--;
            Assert.assertEquals(remainingTransactions, fetchTransactionBatch.remainingTransactions());
            for (int i2 = 0; i2 < 2; i2++) {
                Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch.getCurrentTransactionState());
                fetchTransactionBatch.write(((i * i2) + ",Hello streaming").getBytes());
            }
            fetchTransactionBatch.commit();
            Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
            i++;
        }
        Assert.assertEquals(0, fetchTransactionBatch.remainingTransactions());
        fetchTransactionBatch.close();
        Assert.assertEquals(TransactionBatch.TxnState.INACTIVE, fetchTransactionBatch.getCurrentTransactionState());
        TransactionBatch fetchTransactionBatch2 = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        int i3 = 0;
        int remainingTransactions2 = fetchTransactionBatch2.remainingTransactions();
        while (fetchTransactionBatch2.remainingTransactions() > 0) {
            fetchTransactionBatch2.beginNextTransaction();
            remainingTransactions2--;
            Assert.assertEquals(remainingTransactions2, fetchTransactionBatch2.remainingTransactions());
            for (int i4 = 0; i4 < 2; i4++) {
                Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch2.getCurrentTransactionState());
                fetchTransactionBatch2.write(((i3 * i4) + ",Hello streaming").getBytes());
            }
            fetchTransactionBatch2.abort();
            Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch2.getCurrentTransactionState());
            i3++;
        }
        Assert.assertEquals(0, fetchTransactionBatch2.remainingTransactions());
        fetchTransactionBatch2.close();
        Assert.assertEquals(TransactionBatch.TxnState.INACTIVE, fetchTransactionBatch2.getCurrentTransactionState());
        newConnection.close();
    }

    @Test
    public void testTransactionBatchAbort() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch.abort();
        checkNothingWritten();
        Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.close();
        newConnection.close();
        checkNothingWritten();
    }

    @Test
    public void testTransactionBatchAbortAndCommit() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch.abort();
        checkNothingWritten();
        Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        fetchTransactionBatch.close();
        newConnection.close();
    }

    @Test
    public void testMultipleTransactionBatchCommits() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}");
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        fetchTransactionBatch.close();
        TransactionBatch fetchTransactionBatch2 = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("3,Hello streaming - once again".getBytes());
        fetchTransactionBatch2.commit();
        checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("4,Welcome to streaming - once again".getBytes());
        fetchTransactionBatch2.commit();
        checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}");
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch2.close();
        newConnection.close();
    }

    @Test
    public void testInterleavedTransactionBatchCommits() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        TransactionBatch fetchTransactionBatch2 = newConnection.fetchTransactionBatch(10, new DelimitedInputWriter(fieldNames, ",", hiveEndPoint));
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch2.write("3,Hello streaming - once again".getBytes());
        checkNothingWritten();
        fetchTransactionBatch2.commit();
        checkDataWritten(11L, 20L, 1, 1, "{3, Hello streaming - once again}");
        fetchTransactionBatch.commit();
        checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("4,Welcome to streaming - once again".getBytes());
        checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
        fetchTransactionBatch.commit();
        checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
        fetchTransactionBatch2.commit();
        checkDataWritten(1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}");
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch.close();
        fetchTransactionBatch2.close();
        newConnection.close();
    }

    @Test
    public void testConcurrentTransactionBatchCommits() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing", "alerts", this.partitionVals);
        WriterThd writerThd = new WriterThd(hiveEndPoint, "1,Matrix");
        WriterThd writerThd2 = new WriterThd(hiveEndPoint, "2,Gandhi");
        WriterThd writerThd3 = new WriterThd(hiveEndPoint, "3,Silence");
        writerThd.start();
        writerThd2.start();
        writerThd3.start();
        writerThd.join();
        writerThd2.join();
        writerThd3.join();
    }

    public static void dropDB(IMetaStoreClient iMetaStoreClient, String str) {
        try {
            Iterator it = iMetaStoreClient.listTableNamesByFilter(str, "", (short) -1).iterator();
            while (it.hasNext()) {
                iMetaStoreClient.dropTable(str, (String) it.next(), true, true);
            }
            iMetaStoreClient.dropDatabase(str);
        } catch (TException e) {
        }
    }

    public void createDbAndTable(IMetaStoreClient iMetaStoreClient, String str, String str2, List<String> list) throws Exception {
        Database database = new Database();
        database.setName(str);
        String str3 = "raw://" + this.dbFolder.newFolder(new String[]{str + ".db"}).getCanonicalPath();
        database.setLocationUri(str3);
        iMetaStoreClient.createDatabase(database);
        Table table = new Table();
        table.setDbName(str);
        table.setTableName(str2);
        table.setTableType(TableType.MANAGED_TABLE.toString());
        StorageDescriptor storageDescriptor = new StorageDescriptor();
        storageDescriptor.setCols(getTableColumns());
        storageDescriptor.setNumBuckets(1);
        storageDescriptor.setLocation(str3 + "/" + str2);
        table.setPartitionKeys(getPartitionKeys());
        table.setSd(storageDescriptor);
        storageDescriptor.setBucketCols(new ArrayList(2));
        storageDescriptor.setSerdeInfo(new SerDeInfo());
        storageDescriptor.getSerdeInfo().setName(table.getTableName());
        storageDescriptor.getSerdeInfo().setParameters(new HashMap());
        storageDescriptor.getSerdeInfo().getParameters().put("serialization.format", "1");
        storageDescriptor.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
        storageDescriptor.setInputFormat(HiveInputFormat.class.getName());
        storageDescriptor.setOutputFormat(OrcOutputFormat.class.getName());
        table.setParameters(new HashMap());
        iMetaStoreClient.createTable(table);
        try {
            addPartition(iMetaStoreClient, table, list);
        } catch (AlreadyExistsException e) {
        }
        partLocation = iMetaStoreClient.getPartition(str, str2, list).getSd().getLocation();
    }

    private static void addPartition(IMetaStoreClient iMetaStoreClient, Table table, List<String> list) throws IOException, TException {
        Partition partition = new Partition();
        partition.setDbName(table.getDbName());
        partition.setTableName("alerts");
        StorageDescriptor storageDescriptor = new StorageDescriptor(table.getSd());
        storageDescriptor.setLocation(storageDescriptor.getLocation() + "/" + makePartPath(table.getPartitionKeys(), list));
        partition.setSd(storageDescriptor);
        partition.setValues(list);
        iMetaStoreClient.add_partition(partition);
    }

    private static String makePartPath(List<FieldSchema> list, List<String> list2) {
        if (list.size() != list2.size()) {
            throw new IllegalArgumentException("Partition values:" + list2 + ", does not match the partition Keys in table :" + list);
        }
        StringBuffer stringBuffer = new StringBuffer(list.size() * 20);
        stringBuffer.append(" ( ");
        int i = 0;
        Iterator<FieldSchema> it = list.iterator();
        while (it.hasNext()) {
            stringBuffer.append(it.next().getName());
            stringBuffer.append("='");
            stringBuffer.append(list2.get(i));
            stringBuffer.append("'");
            if (i != list.size() - 1) {
                stringBuffer.append("/");
            }
            i++;
        }
        stringBuffer.append(" )");
        return stringBuffer.toString();
    }

    private static List<FieldSchema> getTableColumns() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FieldSchema(COL1, "int", ""));
        arrayList.add(new FieldSchema(COL2, "string", ""));
        return arrayList;
    }
}
