package org.apache.hive.hcatalog.streaming;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.Thread;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.orc.impl.OrcAcidUtils;
import org.apache.orc.tools.FileDump;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hive/hcatalog/streaming/TestStreaming.class */
public class TestStreaming {
    private final HiveConf conf;
    private Driver driver;
    private final IMetaStoreClient msClient;
    private static final String dbName = "testing";
    private static final String tblName = "alerts";
    private static Path partLoc;
    private static Path partLoc2;
    private static final String dbName2 = "testing2";
    private static final String tblName2 = "alerts";
    private static final String dbName3 = "testing3";
    private static final String tblName3 = "dimensionTable";
    private static final String dbName4 = "testing4";
    private static final String tblName4 = "factTable";
    List<String> partitionVals2;
    private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
    private static final String COL1 = "id";
    private static final String COL2 = "msg";
    private static final String[] fieldNames = {COL1, COL2};
    private static final String[] fieldNames2 = {COL1, COL2};
    final String metaStoreURI = null;
    private final String PART1_CONTINENT = "Asia";
    private final String PART1_COUNTRY = "India";

    @Rule
    public TemporaryFolder dbFolder = new TemporaryFolder();
    List<String> partitionVals = new ArrayList(2);

    /* loaded from: input_file:org/apache/hive/hcatalog/streaming/TestStreaming$FaultyWriter.class */
    private static final class FaultyWriter implements RecordWriter {
        private final RecordWriter delegate;
        private boolean shouldThrow;
        static final /* synthetic */ boolean $assertionsDisabled;

        private FaultyWriter(RecordWriter recordWriter) {
            this.shouldThrow = false;
            if (!$assertionsDisabled && recordWriter == null) {
                throw new AssertionError();
            }
            this.delegate = recordWriter;
        }

        public void write(long j, byte[] bArr) throws StreamingException {
            this.delegate.write(j, bArr);
            produceFault();
        }

        public void flush() throws StreamingException {
            this.delegate.flush();
            produceFault();
        }

        public void clear() throws StreamingException {
            this.delegate.clear();
        }

        public void newBatch(Long l, Long l2) throws StreamingException {
            this.delegate.newBatch(l, l2);
        }

        public void closeBatch() throws StreamingException {
            this.delegate.closeBatch();
        }

        private void produceFault() throws StreamingIOFailure {
            if (this.shouldThrow) {
                throw new StreamingIOFailure("Simulated fault occurred");
            }
        }

        void enableErrors() {
            this.shouldThrow = true;
        }

        void disableErrors() {
            this.shouldThrow = false;
        }

        static {
            $assertionsDisabled = !TestStreaming.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/hive/hcatalog/streaming/TestStreaming$RawFileSystem.class */
    public static class RawFileSystem extends RawLocalFileSystem {
        private static final URI NAME;

        public URI getUri() {
            return NAME;
        }

        public FileStatus getFileStatus(Path path) throws IOException {
            File pathToFile = pathToFile(path);
            if (!pathToFile.exists()) {
                throw new FileNotFoundException("Can't find " + path);
            }
            short s = 0;
            if (pathToFile.canRead()) {
                s = (short) (0 | 292);
            }
            if (pathToFile.canWrite()) {
                s = (short) (s | 128);
            }
            if (pathToFile.canExecute()) {
                s = (short) (s | 73);
            }
            return new FileStatus(pathToFile.length(), pathToFile.isDirectory(), 1, 1024L, pathToFile.lastModified(), pathToFile.lastModified(), FsPermission.createImmutable(s), "owen", "users", path);
        }

        static {
            try {
                NAME = new URI("raw:///");
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException("bad uri", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/hcatalog/streaming/TestStreaming$SampleRec.class */
    public static class SampleRec {
        public String field1;
        public int field2;
        public String field3;

        public SampleRec(String str, int i, String str2) {
            this.field1 = str;
            this.field2 = i;
            this.field3 = str2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SampleRec sampleRec = (SampleRec) obj;
            if (this.field2 != sampleRec.field2) {
                return false;
            }
            if (this.field1 != null) {
                if (!this.field1.equals(sampleRec.field1)) {
                    return false;
                }
            } else if (sampleRec.field1 != null) {
                return false;
            }
            return this.field3 == null ? sampleRec.field3 == null : this.field3.equals(sampleRec.field3);
        }

        public int hashCode() {
            return (31 * ((31 * (this.field1 != null ? this.field1.hashCode() : 0)) + this.field2)) + (this.field3 != null ? this.field3.hashCode() : 0);
        }

        public String toString() {
            return " { '" + this.field1 + "'," + this.field2 + ",'" + this.field3 + "' }";
        }
    }

    /* loaded from: input_file:org/apache/hive/hcatalog/streaming/TestStreaming$WriterThd.class */
    private static class WriterThd extends Thread {
        private final StreamingConnection conn;
        private final DelimitedInputWriter writer;
        private final String data;
        private Throwable error;

        WriterThd(HiveEndPoint hiveEndPoint, String str) throws Exception {
            super("Writer_" + str);
            this.writer = new DelimitedInputWriter(TestStreaming.fieldNames, ",", hiveEndPoint);
            this.conn = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
            this.data = str;
            setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.hive.hcatalog.streaming.TestStreaming.WriterThd.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread, Throwable th) {
                    WriterThd.this.error = th;
                    TestStreaming.LOG.error("Thread " + thread.getName() + " died: " + th.getMessage(), th);
                }
            });
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            TransactionBatch transactionBatch = null;
            try {
                try {
                    transactionBatch = this.conn.fetchTransactionBatch(10, this.writer);
                    while (transactionBatch.remainingTransactions() > 0) {
                        transactionBatch.beginNextTransaction();
                        transactionBatch.write(this.data.getBytes());
                        transactionBatch.write(this.data.getBytes());
                        transactionBatch.commit();
                    }
                    if (transactionBatch != null) {
                        try {
                            transactionBatch.close();
                        } catch (Exception e) {
                            TestStreaming.LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
                            this.conn.close();
                        }
                    }
                    try {
                        this.conn.close();
                    } catch (Exception e2) {
                        TestStreaming.LOG.error("conn.close() failed: " + e2.getMessage(), e2);
                    }
                } catch (Throwable th) {
                    if (transactionBatch != null) {
                        try {
                            transactionBatch.close();
                        } catch (Exception e3) {
                            TestStreaming.LOG.error("txnBatch.close() failed: " + e3.getMessage(), e3);
                            this.conn.close();
                        }
                    }
                    try {
                        this.conn.close();
                    } catch (Exception e4) {
                        TestStreaming.LOG.error("conn.close() failed: " + e4.getMessage(), e4);
                    }
                    throw th;
                }
            } catch (Exception e5) {
                throw new RuntimeException(e5);
            }
        }
    }

    public TestStreaming() throws Exception {
        this.partitionVals.add("Asia");
        this.partitionVals.add("India");
        this.partitionVals2 = new ArrayList(1);
        this.partitionVals2.add("India");
        this.conf = new HiveConf(getClass());
        this.conf.set("fs.default.name", "file:///");
        this.conf.set("fs.raw.impl", RawFileSystem.class.getName());
        this.conf.set("hive.enforce.bucketing", "true");
        this.conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
        TxnDbUtil.setConfValues(this.conf);
        if (this.metaStoreURI != null) {
            this.conf.setVar(HiveConf.ConfVars.METASTOREURIS, this.metaStoreURI);
        }
        this.conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
        this.conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        this.dbFolder.create();
        TxnDbUtil.cleanDb();
        TxnDbUtil.prepDb();
        this.msClient = new HiveMetaStoreClient(this.conf);
    }

    @Before
    public void setup() throws Exception {
        SessionState.start(new CliSessionState(this.conf));
        this.driver = new Driver(this.conf);
        this.driver.setMaxRows(200002);
        dropDB(this.msClient, dbName);
        String[] strArr = {COL1, COL2};
        String[] strArr2 = {"int", "string"};
        String[] strArr3 = {COL1};
        partLoc = createDbAndTable(this.driver, dbName, "alerts", this.partitionVals, strArr, strArr2, strArr3, new String[]{"Continent", "Country"}, this.dbFolder.getRoot().toString() + File.separator + dbName + ".db", 1);
        dropDB(this.msClient, dbName2);
        partLoc2 = createDbAndTable(this.driver, dbName2, "alerts", null, strArr, strArr2, strArr3, null, this.dbFolder.getRoot().toString() + File.separator + dbName2 + ".db", 2);
        createStoreSales("testing5", this.dbFolder.getRoot().toString() + File.separator + "testing5.db");
        runDDL(this.driver, "drop table testBucketing3.streamedtable");
        runDDL(this.driver, "drop table testBucketing3.finaltable");
        runDDL(this.driver, "drop table testBucketing3.nobucket");
    }

    @After
    public void cleanup() throws Exception {
        this.msClient.close();
        this.driver.close();
    }

    private static List<FieldSchema> getPartitionKeys() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FieldSchema("continent", "string", ""));
        arrayList.add(new FieldSchema("country", "string", ""));
        return arrayList;
    }

    private void createStoreSales(String str, String str2) throws Exception {
        String str3 = "raw://" + new Path(str2).toUri().toString();
        Assert.assertTrue(runDDL(this.driver, "create database IF NOT EXISTS " + str + " location '" + str3 + "'"));
        Assert.assertTrue(runDDL(this.driver, "use " + str));
        Assert.assertTrue(runDDL(this.driver, "drop table if exists store_sales"));
        Assert.assertTrue(runDDL(this.driver, "create table store_sales\n(\n    ss_sold_date_sk           int,\n    ss_sold_time_sk           int,\n    ss_item_sk                int,\n    ss_customer_sk            int,\n    ss_cdemo_sk               int,\n    ss_hdemo_sk               int,\n    ss_addr_sk                int,\n    ss_store_sk               int,\n    ss_promo_sk               int,\n    ss_ticket_number          int,\n    ss_quantity               int,\n    ss_wholesale_cost         decimal(7,2),\n    ss_list_price             decimal(7,2),\n    ss_sales_price            decimal(7,2),\n    ss_ext_discount_amt       decimal(7,2),\n    ss_ext_sales_price        decimal(7,2),\n    ss_ext_wholesale_cost     decimal(7,2),\n    ss_ext_list_price         decimal(7,2),\n    ss_ext_tax                decimal(7,2),\n    ss_coupon_amt             decimal(7,2),\n    ss_net_paid               decimal(7,2),\n    ss_net_paid_inc_tax       decimal(7,2),\n    ss_net_profit             decimal(7,2)\n)\n partitioned by (dt string)\nclustered by (ss_store_sk, ss_promo_sk)\nINTO 4 BUCKETS stored as orc  location '" + (str3 + "/store_sales") + "'  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')"));
        Assert.assertTrue(runDDL(this.driver, "alter table store_sales add partition(dt='2015')"));
    }

    @Test
    public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add("2015");
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testing5", "store_sales", arrayList);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(2, new DelimitedInputWriter(new String[]{"ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"}, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 10; i++) {
            for (int i2 = 0; i2 < 11; i2++) {
                sb.append(i2).append(',');
            }
            for (int i3 = 0; i3 < 12; i3++) {
                sb.append(i + 0.1d).append(',');
            }
            sb.setLength(sb.length() - 1);
            fetchTransactionBatch.write(sb.toString().getBytes());
        }
        fetchTransactionBatch.commit();
        fetchTransactionBatch.close();
        newConnection.close();
        Iterator<String> it = queryTable(this.driver, "select row__id.bucketid, * from testing5.store_sales").iterator();
        while (it.hasNext()) {
            System.out.println(it.next());
        }
    }

    @Test
    public void testStreamBucketingMatchesRegularBucketing() throws Exception {
        String str = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String str2 = "'" + str + "/streamedtable'";
        runDDL(this.driver, "create database testBucketing3");
        runDDL(this.driver, "use testBucketing3");
        runDDL(this.driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into 100 buckets  stored as orc  location " + str2 + " TBLPROPERTIES ('transactional'='true')");
        runDDL(this.driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + ("'" + str + "/nobucket'"));
        runDDL(this.driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into 100 buckets  stored as orc location " + ("'" + str + "/finaltable'") + " TBLPROPERTIES ('transactional'='true')");
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testBucketing3", "streamedtable", (List) null);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(new String[]{"key1", "key2", "data"}, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(2, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        for (String str3 : new String[]{"PSFAHYLZVC,29,EPNMA", "PPPRKWAYAU,96,VUTEE", "MIAOFERCHI,3,WBDSI", "CEGQAZOWVN,0,WCUZL", "XWAKMNSVQF,28,YJVHU", "XBWTSAJWME,2,KDQFO", "FUVLQTAXAY,5,LDSDG", "QTQMDJMGJH,6,QBOMA", "EFLOTLWJWN,71,GHWPS", "PEQNAOJHCM,82,CAAFI", "MOEKQLGZCP,41,RUACR", "QZXMCOPTID,37,LFLWE", "EYALVWICRD,13,JEZLC", "VYWLZAYTXX,16,DMVZX", "OSALYSQIXR,47,HNZVE", "JGKVHKCEGQ,25,KSCJB", "WQFMMYDHET,12,DTRWA", "AJOVAYZKZQ,15,YBKFO", "YAQONWCUAU,31,QJNHZ", "DJBXUEUOEB,35,IYCBL"}) {
            fetchTransactionBatch.write(str3.toString().getBytes());
        }
        fetchTransactionBatch.commit();
        fetchTransactionBatch.close();
        newConnection.close();
        Iterator<String> it = queryTable(this.driver, "select row__id.bucketid, * from streamedtable order by key2").iterator();
        while (it.hasNext()) {
            System.out.println(it.next());
        }
        this.driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
        runDDL(this.driver, " insert into finaltable select * from nobucket");
        ArrayList<String> queryTable = queryTable(this.driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
        Iterator<String> it2 = queryTable.iterator();
        while (it2.hasNext()) {
            LOG.error(it2.next());
        }
        Assert.assertTrue(queryTable.isEmpty());
    }

    @Test
    public void testTableValidation() throws Exception {
        String str = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String str2 = "'" + str + "/validation1'";
        runDDL(this.driver, "create database testBucketing3");
        runDDL(this.driver, "use testBucketing3");
        runDDL(this.driver, "create table validation1 ( key1 string, data string ) clustered by ( key1 ) into 100 buckets  stored as orc  location " + str2);
        runDDL(this.driver, "create table validation2 ( key1 string, data string ) clustered by ( key1 ) into 100 buckets  stored as orc  location " + ("'" + str + "/validation2'") + " TBLPROPERTIES ('transactional'='false')");
        try {
            new HiveEndPoint(this.metaStoreURI, "testBucketing3", "validation1", (List) null).newConnection(false, "UT_" + Thread.currentThread().getName());
            Assert.assertTrue("InvalidTable exception was not thrown", false);
        } catch (InvalidTable e) {
        }
        try {
            new HiveEndPoint(this.metaStoreURI, "testBucketing3", "validation2", (List) null).newConnection(false, "UT_" + Thread.currentThread().getName());
            Assert.assertTrue("InvalidTable exception was not thrown", false);
        } catch (InvalidTable e2) {
        }
    }

    private void checkDataWritten(Path path, long j, long j2, int i, int i2, String... strArr) throws Exception {
        ValidTxnList validTxns = this.msClient.getValidTxns();
        AcidUtils.Directory acidState = AcidUtils.getAcidState(path, this.conf, validTxns);
        Assert.assertEquals(0L, acidState.getObsolete().size());
        Assert.assertEquals(0L, acidState.getOriginalFiles().size());
        List<AcidUtils.ParsedDelta> currentDirectories = acidState.getCurrentDirectories();
        System.out.println("Files found: ");
        Iterator it = currentDirectories.iterator();
        while (it.hasNext()) {
            System.out.println(((AcidUtils.ParsedDelta) it.next()).getPath().toString());
        }
        Assert.assertEquals(i2, currentDirectories.size());
        long j3 = Long.MAX_VALUE;
        long j4 = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta parsedDelta : currentDirectories) {
            if (parsedDelta.getMaxTransaction() > j4) {
                j4 = parsedDelta.getMaxTransaction();
            }
            if (parsedDelta.getMinTransaction() < j3) {
                j3 = parsedDelta.getMinTransaction();
            }
        }
        Assert.assertEquals(j, j3);
        Assert.assertEquals(j2, j4);
        OrcInputFormat orcInputFormat = new OrcInputFormat();
        JobConf jobConf = new JobConf();
        jobConf.set("mapred.input.dir", path.toString());
        jobConf.set("bucket_count", Integer.toString(i));
        jobConf.set("schema.evolution.columns", "id,msg");
        jobConf.set("schema.evolution.columns.types", "bigint:string");
        jobConf.set(HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
        jobConf.set("hive.txn.valid.txns", validTxns.toString());
        InputSplit[] splits = orcInputFormat.getSplits(jobConf, i);
        Assert.assertEquals(i, splits.length);
        RecordReader recordReader = orcInputFormat.getRecordReader(splits[0], jobConf, Reporter.NULL);
        NullWritable nullWritable = (NullWritable) recordReader.createKey();
        OrcStruct orcStruct = (OrcStruct) recordReader.createValue();
        for (String str : strArr) {
            Assert.assertEquals(true, Boolean.valueOf(recordReader.next(nullWritable, orcStruct)));
            Assert.assertEquals(str, orcStruct.toString());
        }
        Assert.assertEquals(false, Boolean.valueOf(recordReader.next(nullWritable, orcStruct)));
    }

    private void checkNothingWritten(Path path) throws Exception {
        AcidUtils.Directory acidState = AcidUtils.getAcidState(path, this.conf, this.msClient.getValidTxns());
        Assert.assertEquals(0L, acidState.getObsolete().size());
        Assert.assertEquals(0L, acidState.getOriginalFiles().size());
        Assert.assertEquals(0L, acidState.getCurrentDirectories().size());
    }

    @Test
    public void testEndpointConnection() throws Exception {
        new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals).newConnection(false, "UT_" + Thread.currentThread().getName()).close();
        new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", (List) null).newConnection(false, "UT_" + Thread.currentThread().getName()).close();
        try {
            StreamingConnection newConnection = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", (List) null).newConnection(true, "UT_" + Thread.currentThread().getName());
            Assert.assertTrue("ConnectionError was not thrown", false);
            newConnection.close();
        } catch (ConnectionError e) {
            Assert.assertTrue(e.toString().endsWith("doesn't specify any partitions for partitioned table"));
        }
        try {
            StreamingConnection newConnection2 = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", this.partitionVals).newConnection(false, "UT_" + Thread.currentThread().getName());
            Assert.assertTrue("ConnectionError was not thrown", false);
            newConnection2.close();
        } catch (ConnectionError e2) {
            Assert.assertTrue(e2.toString().endsWith("specifies partitions for unpartitioned table"));
        }
    }

    @Test
    public void testAddPartition() throws Exception {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add("Asia");
        arrayList.add("Nepal");
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", arrayList);
        try {
            this.msClient.getPartition(hiveEndPoint.database, hiveEndPoint.table, hiveEndPoint.partitionVals);
            Assert.assertTrue("Partition already exists", false);
        } catch (NoSuchObjectException e) {
        }
        Assert.assertNotNull(hiveEndPoint.newConnection(true, "UT_" + Thread.currentThread().getName()));
        Assert.assertNotNull("Did not find added partition", this.msClient.getPartition(hiveEndPoint.database, hiveEndPoint.table, hiveEndPoint.partitionVals));
    }

    @Test
    public void testTransactionBatchEmptyCommit() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, new DelimitedInputWriter(fieldNames, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.commit();
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.close();
        newConnection.close();
        HiveEndPoint hiveEndPoint2 = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", (List) null);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames2, ",", hiveEndPoint2);
        StreamingConnection newConnection2 = hiveEndPoint2.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch2 = newConnection2.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.commit();
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch2.close();
        newConnection2.close();
    }

    @Test
    public void testTimeOutReaper() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", (List) null);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames2, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(5, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        this.conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0L, TimeUnit.SECONDS);
        this.conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1L, TimeUnit.MILLISECONDS);
        AcidHouseKeeperService acidHouseKeeperService = new AcidHouseKeeperService();
        acidHouseKeeperService.start(this.conf);
        while (acidHouseKeeperService.getIsAliveCounter() <= Integer.MIN_VALUE) {
            Thread.sleep(100L);
        }
        acidHouseKeeperService.stop();
        try {
            fetchTransactionBatch.commit();
        } catch (TransactionError e) {
            Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException);
        }
        fetchTransactionBatch.close();
        TransactionBatch fetchTransactionBatch2 = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.commit();
        fetchTransactionBatch2.beginNextTransaction();
        int isAliveCounter = acidHouseKeeperService.getIsAliveCounter();
        acidHouseKeeperService.start(this.conf);
        while (acidHouseKeeperService.getIsAliveCounter() <= isAliveCounter) {
            Thread.sleep(100L);
        }
        acidHouseKeeperService.stop();
        try {
            fetchTransactionBatch2.commit();
        } catch (TransactionError e2) {
            Assert.assertTrue("Expected aborted transaction", e2.getCause() instanceof TxnAbortedException);
        }
        fetchTransactionBatch2.close();
        newConnection.close();
    }

    @Test
    public void testHeartbeat() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", (List) null);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(5, new DelimitedInputWriter(fieldNames2, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        ShowLocksRequest showLocksRequest = new ShowLocksRequest();
        showLocksRequest.setDbname(dbName2);
        showLocksRequest.setTablename("alerts");
        ShowLocksResponse showLocks = this.msClient.showLocks(showLocksRequest);
        Assert.assertEquals("Wrong nubmer of locks: " + showLocks, 1L, showLocks.getLocks().size());
        ShowLocksResponseElement showLocksResponseElement = (ShowLocksResponseElement) showLocks.getLocks().get(0);
        long acquiredat = showLocksResponseElement.getAcquiredat();
        long lastheartbeat = showLocksResponseElement.getLastheartbeat();
        fetchTransactionBatch.heartbeat();
        ShowLocksResponse showLocks2 = this.msClient.showLocks(showLocksRequest);
        Assert.assertEquals("Wrong number of locks2: " + showLocks2, 1L, showLocks2.getLocks().size());
        ShowLocksResponseElement showLocksResponseElement2 = (ShowLocksResponseElement) showLocks2.getLocks().get(0);
        Assert.assertEquals("Acquired timestamp didn't match", acquiredat, showLocksResponseElement2.getAcquiredat());
        Assert.assertTrue("Expected new heartbeat (" + showLocksResponseElement2.getLastheartbeat() + ") == old heartbeat(" + lastheartbeat + ")", showLocksResponseElement2.getLastheartbeat() == lastheartbeat);
    }

    @Test
    public void testTransactionBatchEmptyAbort() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, new DelimitedInputWriter(fieldNames, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.abort();
        Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.close();
        newConnection.close();
        HiveEndPoint hiveEndPoint2 = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", (List) null);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint2);
        StreamingConnection newConnection2 = hiveEndPoint2.newConnection(true, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch2 = newConnection2.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.abort();
        Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch2.close();
        newConnection2.close();
    }

    @Test
    public void testTransactionBatchCommit_Delimited() throws Exception {
        testTransactionBatchCommit_Delimited(null);
    }

    @Test
    public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
        testTransactionBatchCommit_Delimited(Utils.getUGI());
    }

    private void testTransactionBatchCommit_Delimited(UserGroupInformation userGroupInformation) throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true, this.conf, userGroupInformation, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, new DelimitedInputWriter(fieldNames, ",", hiveEndPoint, this.conf, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.beginNextTransaction();
        Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        fetchTransactionBatch.commit();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        fetchTransactionBatch.close();
        Assert.assertEquals(TransactionBatch.TxnState.INACTIVE, fetchTransactionBatch.getCurrentTransactionState());
        newConnection.close();
        HiveEndPoint hiveEndPoint2 = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", (List) null);
        StreamingConnection newConnection2 = hiveEndPoint2.newConnection(true, this.conf, userGroupInformation, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch2 = newConnection2.fetchTransactionBatch(10, new DelimitedInputWriter(fieldNames, ",", hiveEndPoint2, this.conf, newConnection2));
        fetchTransactionBatch2.beginNextTransaction();
        Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch2.write("1,Hello streaming".getBytes());
        fetchTransactionBatch2.commit();
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch2.getCurrentTransactionState());
        newConnection2.close();
    }

    @Test
    public void testTransactionBatchCommit_Json() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, new StrictJsonWriter(hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.write("{\"id\" : 1, \"msg\": \"Hello streaming\"}".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.close();
        Assert.assertEquals(TransactionBatch.TxnState.INACTIVE, fetchTransactionBatch.getCurrentTransactionState());
        newConnection.close();
        Assert.assertEquals(1L, queryTable(this.driver, "select * from testing.alerts").size());
    }

    @Test
    public void testRemainingTransactions() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        int i = 0;
        int remainingTransactions = fetchTransactionBatch.remainingTransactions();
        while (fetchTransactionBatch.remainingTransactions() > 0) {
            fetchTransactionBatch.beginNextTransaction();
            remainingTransactions--;
            Assert.assertEquals(remainingTransactions, fetchTransactionBatch.remainingTransactions());
            for (int i2 = 0; i2 < 2; i2++) {
                Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch.getCurrentTransactionState());
                fetchTransactionBatch.write(((i * i2) + ",Hello streaming").getBytes());
            }
            fetchTransactionBatch.commit();
            Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
            i++;
        }
        Assert.assertEquals(0L, fetchTransactionBatch.remainingTransactions());
        fetchTransactionBatch.close();
        Assert.assertEquals(TransactionBatch.TxnState.INACTIVE, fetchTransactionBatch.getCurrentTransactionState());
        TransactionBatch fetchTransactionBatch2 = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        int i3 = 0;
        int remainingTransactions2 = fetchTransactionBatch2.remainingTransactions();
        while (fetchTransactionBatch2.remainingTransactions() > 0) {
            fetchTransactionBatch2.beginNextTransaction();
            remainingTransactions2--;
            Assert.assertEquals(remainingTransactions2, fetchTransactionBatch2.remainingTransactions());
            for (int i4 = 0; i4 < 2; i4++) {
                Assert.assertEquals(TransactionBatch.TxnState.OPEN, fetchTransactionBatch2.getCurrentTransactionState());
                fetchTransactionBatch2.write(((i3 * i4) + ",Hello streaming").getBytes());
            }
            fetchTransactionBatch2.abort();
            Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch2.getCurrentTransactionState());
            i3++;
        }
        Assert.assertEquals(0L, fetchTransactionBatch2.remainingTransactions());
        fetchTransactionBatch2.close();
        Assert.assertEquals(TransactionBatch.TxnState.INACTIVE, fetchTransactionBatch2.getCurrentTransactionState());
        newConnection.close();
    }

    @Test
    public void testTransactionBatchAbort() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, new DelimitedInputWriter(fieldNames, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch.abort();
        checkNothingWritten(partLoc);
        Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.close();
        newConnection.close();
        checkNothingWritten(partLoc);
    }

    @Test
    public void testTransactionBatchAbortAndCommit() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, str);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, new DelimitedInputWriter(fieldNames, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        ShowLocksResponse showLocks = this.msClient.showLocks(new ShowLocksRequest());
        Assert.assertEquals("LockCount", 1L, showLocks.getLocksSize());
        Assert.assertEquals("LockType", LockType.SHARED_READ, ((ShowLocksResponseElement) showLocks.getLocks().get(0)).getType());
        Assert.assertEquals("LockState", LockState.ACQUIRED, ((ShowLocksResponseElement) showLocks.getLocks().get(0)).getState());
        Assert.assertEquals("AgentInfo", str, ((ShowLocksResponseElement) showLocks.getLocks().get(0)).getAgentInfo());
        fetchTransactionBatch.abort();
        checkNothingWritten(partLoc);
        Assert.assertEquals(TransactionBatch.TxnState.ABORTED, fetchTransactionBatch.getCurrentTransactionState());
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        fetchTransactionBatch.close();
        newConnection.close();
    }

    @Test
    public void testMultipleTransactionBatchCommits() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(true, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch.commit();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        fetchTransactionBatch.close();
        TransactionBatch fetchTransactionBatch2 = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("3,Hello streaming - once again".getBytes());
        fetchTransactionBatch2.commit();
        checkDataWritten(partLoc, 1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("4,Welcome to streaming - once again".getBytes());
        fetchTransactionBatch2.commit();
        checkDataWritten(partLoc, 1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}");
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch2.close();
        newConnection.close();
    }

    @Test
    public void testInterleavedTransactionBatchCommits() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        DelimitedInputWriter delimitedInputWriter = new DelimitedInputWriter(fieldNames, ",", hiveEndPoint);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(10, delimitedInputWriter);
        fetchTransactionBatch.beginNextTransaction();
        TransactionBatch fetchTransactionBatch2 = newConnection.fetchTransactionBatch(10, new DelimitedInputWriter(fieldNames, ",", hiveEndPoint));
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch.write("1,Hello streaming".getBytes());
        fetchTransactionBatch2.write("3,Hello streaming - once again".getBytes());
        checkNothingWritten(partLoc);
        fetchTransactionBatch2.commit();
        checkDataWritten(partLoc, 11L, 20L, 1, 1, "{3, Hello streaming - once again}");
        fetchTransactionBatch.commit();
        checkDataWritten(partLoc, 1L, 20L, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("2,Welcome to streaming".getBytes());
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("4,Welcome to streaming - once again".getBytes());
        checkDataWritten(partLoc, 1L, 20L, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
        fetchTransactionBatch.commit();
        checkDataWritten(partLoc, 1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
        fetchTransactionBatch2.commit();
        checkDataWritten(partLoc, 1L, 20L, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}");
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch.getCurrentTransactionState());
        Assert.assertEquals(TransactionBatch.TxnState.COMMITTED, fetchTransactionBatch2.getCurrentTransactionState());
        fetchTransactionBatch.close();
        fetchTransactionBatch2.close();
        newConnection.close();
    }

    @Test
    public void testConcurrentTransactionBatchCommits() throws Exception {
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        ArrayList<WriterThd> arrayList = new ArrayList(3);
        arrayList.add(new WriterThd(hiveEndPoint, "1,Matrix"));
        arrayList.add(new WriterThd(hiveEndPoint, "2,Gandhi"));
        arrayList.add(new WriterThd(hiveEndPoint, "3,Silence"));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((WriterThd) it.next()).start();
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((WriterThd) it2.next()).join();
        }
        for (WriterThd writerThd : arrayList) {
            if (writerThd.error != null) {
                Assert.assertFalse("Writer thread" + writerThd.getName() + " died: " + writerThd.error.getMessage() + " See log file for stack trace", true);
            }
        }
    }

    private ArrayList<SampleRec> dumpBucket(Path path) throws IOException {
        Reader createReader = OrcFile.createReader(path, OrcFile.readerOptions(this.conf).filesystem(FileSystem.getLocal(new Configuration())));
        org.apache.hadoop.hive.ql.io.orc.RecordReader rows = createReader.rows();
        StructObjectInspector objectInspector = createReader.getObjectInspector();
        System.out.format("Found Bucket File : %s \n", path.getName());
        ArrayList<SampleRec> arrayList = new ArrayList<>();
        while (rows.hasNext()) {
            arrayList.add((SampleRec) deserializeDeltaFileRow(rows.next((Object) null), objectInspector)[5]);
        }
        return arrayList;
    }

    private static Object[] deserializeDeltaFileRow(Object obj, StructObjectInspector structObjectInspector) {
        List allStructFieldRefs = structObjectInspector.getAllStructFieldRefs();
        WritableIntObjectInspector fieldObjectInspector = ((StructField) allStructFieldRefs.get(0)).getFieldObjectInspector();
        WritableLongObjectInspector fieldObjectInspector2 = ((StructField) allStructFieldRefs.get(1)).getFieldObjectInspector();
        WritableIntObjectInspector fieldObjectInspector3 = ((StructField) allStructFieldRefs.get(2)).getFieldObjectInspector();
        WritableLongObjectInspector fieldObjectInspector4 = ((StructField) allStructFieldRefs.get(3)).getFieldObjectInspector();
        WritableLongObjectInspector fieldObjectInspector5 = ((StructField) allStructFieldRefs.get(4)).getFieldObjectInspector();
        StructObjectInspector fieldObjectInspector6 = ((StructField) allStructFieldRefs.get(5)).getFieldObjectInspector();
        return new Object[]{Integer.valueOf(fieldObjectInspector.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(0)))), Long.valueOf(fieldObjectInspector2.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(1)))), Integer.valueOf(fieldObjectInspector3.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(2)))), Long.valueOf(fieldObjectInspector4.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(3)))), Long.valueOf(fieldObjectInspector5.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(4)))), deserializeInner(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(5)), fieldObjectInspector6)};
    }

    private static SampleRec deserializeInner(Object obj, StructObjectInspector structObjectInspector) {
        List allStructFieldRefs = structObjectInspector.getAllStructFieldRefs();
        return new SampleRec(((StructField) allStructFieldRefs.get(0)).getFieldObjectInspector().getPrimitiveJavaObject(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(0))), ((StructField) allStructFieldRefs.get(1)).getFieldObjectInspector().get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(1))), ((StructField) allStructFieldRefs.get(2)).getFieldObjectInspector().getPrimitiveJavaObject(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(2))));
    }

    @Test
    public void testBucketing() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        dropDB(this.msClient, dbName3);
        dropDB(this.msClient, dbName4);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        String[] split = "key1,key2,data".split(",");
        createDbAndTable(this.driver, dbName3, tblName3, null, split, "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        String replaceAll2 = (this.dbFolder.newFolder(dbName4).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        String[] split2 = "key3,key4,data2".split(",");
        createDbAndTable(this.driver, dbName4, tblName4, null, split2, "string,int,string".split(","), "key3,key4".split(","), null, replaceAll2, 4);
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName3, tblName3, (List) null);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, str);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(2, new DelimitedInputWriter(split, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("name0,1,Hello streaming".getBytes());
        fetchTransactionBatch.write("name2,2,Welcome to streaming".getBytes());
        fetchTransactionBatch.write("name4,2,more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name5,2,even more Streaming unlimited".getBytes());
        fetchTransactionBatch.commit();
        HiveEndPoint hiveEndPoint2 = new HiveEndPoint(this.metaStoreURI, dbName4, tblName4, (List) null);
        TransactionBatch fetchTransactionBatch2 = hiveEndPoint2.newConnection(false, str).fetchTransactionBatch(2, new DelimitedInputWriter(split2, ",", hiveEndPoint2, newConnection));
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("name5,2,fact3".getBytes());
        fetchTransactionBatch2.write("name8,2,fact3".getBytes());
        fetchTransactionBatch2.write("name0,1,fact1".getBytes());
        fetchTransactionBatch2.commit();
        HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets = dumpAllBuckets(replaceAll, tblName3);
        HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets2 = dumpAllBuckets(replaceAll2, tblName4);
        System.err.println("\n  Table 1");
        System.err.println(dumpAllBuckets);
        System.err.println("\n  Table 2");
        System.err.println(dumpAllBuckets2);
        Assert.assertEquals("number of buckets does not match expectation", dumpAllBuckets.values().size(), 3L);
        Assert.assertEquals("records in bucket does not match expectation", dumpAllBuckets.get(0).size(), 2L);
        Assert.assertEquals("records in bucket does not match expectation", dumpAllBuckets.get(1).size(), 1L);
        Assert.assertTrue("bucket 2 shouldn't have been created", dumpAllBuckets.get(2) == null);
        Assert.assertEquals("records in bucket does not match expectation", dumpAllBuckets.get(3).size(), 1L);
    }

    private void runCmdOnDriver(String str) throws QueryFailedException {
        Assert.assertTrue(str + " failed", runDDL(this.driver, str));
    }

    @Test
    public void testFileDump() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        dropDB(this.msClient, dbName3);
        dropDB(this.msClient, dbName4);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        String[] split = "key1,key2,data".split(",");
        createDbAndTable(this.driver, dbName3, tblName3, null, split, "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        String replaceAll2 = (this.dbFolder.newFolder(dbName4).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        String[] split2 = "key3,key4,data2".split(",");
        createDbAndTable(this.driver, dbName4, tblName4, null, split2, "string,int,string".split(","), "key3,key4".split(","), null, replaceAll2, 4);
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName3, tblName3, (List) null);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, str);
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(2, new DelimitedInputWriter(split, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("name0,1,Hello streaming".getBytes());
        fetchTransactionBatch.write("name2,2,Welcome to streaming".getBytes());
        fetchTransactionBatch.write("name4,2,more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name5,2,even more Streaming unlimited".getBytes());
        fetchTransactionBatch.commit();
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream));
        FileDump.main(new String[]{replaceAll}, true);
        System.err.flush();
        System.setErr(printStream);
        String str2 = new String(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("is still open for writes.")));
        HiveEndPoint hiveEndPoint2 = new HiveEndPoint(this.metaStoreURI, dbName4, tblName4, (List) null);
        TransactionBatch fetchTransactionBatch2 = hiveEndPoint2.newConnection(false, str).fetchTransactionBatch(2, new DelimitedInputWriter(split2, ",", hiveEndPoint2));
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("name5,2,fact3".getBytes());
        fetchTransactionBatch2.write("name8,2,fact3".getBytes());
        fetchTransactionBatch2.write("name0,1,fact1".getBytes());
        fetchTransactionBatch2.commit();
        PrintStream printStream2 = System.err;
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream2));
        FileDump.main(new String[]{replaceAll}, true);
        System.out.flush();
        System.err.flush();
        System.setErr(printStream2);
        String str3 = new String(byteArrayOutputStream2.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("is still open for writes.")));
    }

    @Test
    public void testFileDumpCorruptDataFiles() throws Exception {
        dropDB(this.msClient, dbName3);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        String[] split = "key1,key2,data".split(",");
        createDbAndTable(this.driver, dbName3, tblName3, null, split, "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName3, tblName3, (List) null);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(2, new DelimitedInputWriter(split, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("name0,1,Hello streaming".getBytes());
        fetchTransactionBatch.write("name2,2,Welcome to streaming".getBytes());
        fetchTransactionBatch.write("name4,2,more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name5,2,even more Streaming unlimited".getBytes());
        fetchTransactionBatch.commit();
        Path path = new Path(replaceAll);
        for (String str : FileDump.getAllFilesInPath(path, this.conf)) {
            if (str.contains("bucket_00000")) {
                corruptDataFile(str, this.conf, Integer.MIN_VALUE);
            } else if (str.contains("bucket_00001")) {
                corruptDataFile(str, this.conf, -1);
            } else if (str.contains("bucket_00002")) {
                Assert.assertFalse("bucket 2 shouldn't have been created", true);
            } else if (str.contains("bucket_00003")) {
                corruptDataFile(str, this.conf, 100);
            }
        }
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream));
        FileDump.main(new String[]{replaceAll}, true);
        System.err.flush();
        System.setErr(printStream);
        String str2 = new String(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("Exception")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("3 file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("is still open for writes.")));
        PrintStream printStream2 = System.err;
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream2));
        FileDump.main(new String[]{replaceAll, "--recover", "--skip-dump"}, true);
        System.err.flush();
        System.setErr(printStream2);
        String str3 = new String(byteArrayOutputStream2.toByteArray());
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00000 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("No readable footers found. Creating empty orc file.")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00001 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00003 recovered successfully!")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("is still open for writes.")));
        PrintStream printStream3 = System.err;
        ByteArrayOutputStream byteArrayOutputStream3 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream3));
        FileDump.main(new String[]{replaceAll}, true);
        System.err.flush();
        System.setErr(printStream3);
        String str4 = new String(byteArrayOutputStream3.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("is still open for writes.")));
        Iterator it = FileDump.getAllFilesInPath(path, this.conf).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(false, Boolean.valueOf(((String) it.next()).contains("_flush_length")));
        }
        fetchTransactionBatch.close();
    }

    private void corruptDataFile(String str, Configuration configuration, int i) throws Exception {
        Path path = new Path(str);
        Path path2 = new Path(path.getParent(), path.getName() + ".corrupt");
        FileSystem fileSystem = path.getFileSystem(configuration);
        FileStatus fileStatus = fileSystem.getFileStatus(path);
        byte[] bArr = new byte[i == Integer.MIN_VALUE ? 0 : ((int) fileStatus.getLen()) + i];
        FSDataInputStream open = fileSystem.open(path);
        open.readFully(0L, bArr, 0, (int) Math.min(fileStatus.getLen(), bArr.length));
        open.close();
        FSDataOutputStream create = fileSystem.create(path2, true);
        create.write(bArr, 0, bArr.length);
        create.close();
        fileSystem.delete(path, false);
        fileSystem.rename(path2, path);
    }

    @Test
    public void testFileDumpCorruptSideFiles() throws Exception {
        dropDB(this.msClient, dbName3);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        String[] split = "key1,key2,data".split(",");
        createDbAndTable(this.driver, dbName3, tblName3, null, split, "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, dbName3, tblName3, (List) null);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(2, new DelimitedInputWriter(split, ",", hiveEndPoint, newConnection));
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("name0,1,Hello streaming".getBytes());
        fetchTransactionBatch.write("name2,2,Welcome to streaming".getBytes());
        fetchTransactionBatch.write("name4,2,more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name5,2,even more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name6,3,aHello streaming".getBytes());
        fetchTransactionBatch.commit();
        HashMap hashMap = new HashMap();
        recordOffsets(this.conf, replaceAll, hashMap);
        fetchTransactionBatch.beginNextTransaction();
        fetchTransactionBatch.write("name01,11,-Hello streaming".getBytes());
        fetchTransactionBatch.write("name21,21,-Welcome to streaming".getBytes());
        fetchTransactionBatch.write("name41,21,-more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name51,21,-even more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name02,12,--Hello streaming".getBytes());
        fetchTransactionBatch.write("name22,22,--Welcome to streaming".getBytes());
        fetchTransactionBatch.write("name42,22,--more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name52,22,--even more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name7,4,aWelcome to streaming".getBytes());
        fetchTransactionBatch.write("name8,5,amore Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name9,6,aeven more Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name10,7,bHello streaming".getBytes());
        fetchTransactionBatch.write("name11,8,bWelcome to streaming".getBytes());
        fetchTransactionBatch.write("name12,9,bmore Streaming unlimited".getBytes());
        fetchTransactionBatch.write("name13,10,beven more Streaming unlimited".getBytes());
        fetchTransactionBatch.commit();
        recordOffsets(this.conf, replaceAll, hashMap);
        Path path = new Path(replaceAll);
        for (String str : FileDump.getAllFilesInPath(path, this.conf)) {
            if (str.contains("bucket_00000")) {
                corruptSideFile(str, this.conf, hashMap, "bucket_00000", -1);
            } else if (str.contains("bucket_00001")) {
                corruptSideFile(str, this.conf, hashMap, "bucket_00001", 0);
            } else if (str.contains("bucket_00002")) {
                corruptSideFile(str, this.conf, hashMap, "bucket_00002", 3);
            } else if (str.contains("bucket_00003")) {
                corruptSideFile(str, this.conf, hashMap, "bucket_00003", 10);
            }
        }
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream));
        FileDump.main(new String[]{replaceAll}, true);
        System.err.flush();
        System.setErr(printStream);
        String str2 = new String(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("bucket_00000_flush_length [length: 11")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("bucket_00001_flush_length [length: 0")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("bucket_00002_flush_length [length: 24")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("bucket_00003_flush_length [length: 80")));
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("Exception")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("4 file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("is still open for writes.")));
        PrintStream printStream2 = System.err;
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream2));
        FileDump.main(new String[]{replaceAll, "--recover", "--skip-dump"}, true);
        System.err.flush();
        System.setErr(printStream2);
        String str3 = new String(byteArrayOutputStream2.toByteArray());
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00000 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00001 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00002 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00003 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("Readable footerOffsets: " + hashMap.get("bucket_00000").toString())));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("Readable footerOffsets: " + hashMap.get("bucket_00001").toString())));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("Readable footerOffsets: " + hashMap.get("bucket_00002").toString())));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("Readable footerOffsets: " + hashMap.get("bucket_00003").toString())));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("is still open for writes.")));
        PrintStream printStream3 = System.err;
        ByteArrayOutputStream byteArrayOutputStream3 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream3));
        FileDump.main(new String[]{replaceAll}, true);
        System.err.flush();
        System.setErr(printStream3);
        String str4 = new String(byteArrayOutputStream3.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("is still open for writes.")));
        Iterator it = FileDump.getAllFilesInPath(path, this.conf).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(false, Boolean.valueOf(((String) it.next()).contains("_flush_length")));
        }
        fetchTransactionBatch.close();
    }

    private void corruptSideFile(String str, HiveConf hiveConf, Map<String, List<Long>> map, String str2, int i) throws IOException {
        Path sideFile = OrcAcidUtils.getSideFile(new Path(str));
        Path path = new Path(sideFile.getParent(), sideFile.getName() + ".corrupt");
        FileSystem fileSystem = sideFile.getFileSystem(hiveConf);
        List<Long> list = map.get(str2);
        long longValue = list.get(list.size() - 1).longValue();
        FSDataOutputStream create = fileSystem.create(path, true);
        if (i < 0) {
            byte[] longToBytes = longToBytes(longValue);
            for (int i2 = 0; i2 < list.size() - 1; i2++) {
                create.writeLong(list.get(i2).longValue());
            }
            create.write(longToBytes, 0, 3);
        } else if (i > 0) {
            int min = Math.min(list.size(), i);
            for (int i3 = 0; i3 < min; i3++) {
                create.writeLong(list.get(i3).longValue());
            }
            int i4 = i - min;
            for (int i5 = 0; i5 < i4; i5++) {
                create.writeLong(longValue + ((i5 + 1) * 100));
            }
        }
        create.close();
        fileSystem.delete(sideFile, false);
        fileSystem.rename(path, sideFile);
    }

    private byte[] longToBytes(long j) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(j);
        return allocate.array();
    }

    private void recordOffsets(HiveConf hiveConf, String str, Map<String, List<Long>> map) throws IOException {
        for (String str2 : FileDump.getAllFilesInPath(new Path(str), hiveConf)) {
            Path path = new Path(str2);
            long len = path.getFileSystem(hiveConf).getFileStatus(path).getLen();
            if (str2.contains("bucket_00000")) {
                if (map.containsKey("bucket_00000")) {
                    List<Long> list = map.get("bucket_00000");
                    list.add(Long.valueOf(len));
                    map.put("bucket_00000", list);
                } else {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(Long.valueOf(len));
                    map.put("bucket_00000", arrayList);
                }
            } else if (str2.contains("bucket_00001")) {
                if (map.containsKey("bucket_00001")) {
                    List<Long> list2 = map.get("bucket_00001");
                    list2.add(Long.valueOf(len));
                    map.put("bucket_00001", list2);
                } else {
                    ArrayList arrayList2 = new ArrayList();
                    arrayList2.add(Long.valueOf(len));
                    map.put("bucket_00001", arrayList2);
                }
            } else if (str2.contains("bucket_00002")) {
                if (map.containsKey("bucket_00002")) {
                    List<Long> list3 = map.get("bucket_00002");
                    list3.add(Long.valueOf(len));
                    map.put("bucket_00002", list3);
                } else {
                    ArrayList arrayList3 = new ArrayList();
                    arrayList3.add(Long.valueOf(len));
                    map.put("bucket_00002", arrayList3);
                }
            } else if (str2.contains("bucket_00003")) {
                if (map.containsKey("bucket_00003")) {
                    List<Long> list4 = map.get("bucket_00003");
                    list4.add(Long.valueOf(len));
                    map.put("bucket_00003", list4);
                } else {
                    ArrayList arrayList4 = new ArrayList();
                    arrayList4.add(Long.valueOf(len));
                    map.put("bucket_00003", arrayList4);
                }
            }
        }
    }

    @Test
    public void testErrorHandling() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        runCmdOnDriver("create database testErrors");
        runCmdOnDriver("use testErrors");
        runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
        HiveEndPoint hiveEndPoint = new HiveEndPoint(this.metaStoreURI, "testErrors", "T", (List) null);
        StreamingConnection newConnection = hiveEndPoint.newConnection(false, str);
        FaultyWriter faultyWriter = new FaultyWriter(new DelimitedInputWriter("a,b".split(","), ",", hiveEndPoint, newConnection));
        TransactionBatch fetchTransactionBatch = newConnection.fetchTransactionBatch(2, faultyWriter);
        fetchTransactionBatch.close();
        fetchTransactionBatch.heartbeat();
        fetchTransactionBatch.abort();
        GetOpenTxnsInfoResponse showTxns = this.msClient.showTxns();
        Assert.assertEquals("HWM didn't match", 2L, showTxns.getTxn_high_water_mark());
        List open_txns = showTxns.getOpen_txns();
        Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ((TxnInfo) open_txns.get(0)).getState());
        Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ((TxnInfo) open_txns.get(1)).getState());
        IllegalStateException illegalStateException = null;
        try {
            fetchTransactionBatch.beginNextTransaction();
        } catch (IllegalStateException e) {
            illegalStateException = e;
        }
        Assert.assertTrue("beginNextTransaction() should have failed", illegalStateException != null && illegalStateException.getMessage().contains("has been closed()"));
        IllegalStateException illegalStateException2 = null;
        try {
            fetchTransactionBatch.write("name0,1,Hello streaming".getBytes());
        } catch (IllegalStateException e2) {
            illegalStateException2 = e2;
        }
        Assert.assertTrue("write()  should have failed", illegalStateException2 != null && illegalStateException2.getMessage().contains("has been closed()"));
        IllegalStateException illegalStateException3 = null;
        try {
            fetchTransactionBatch.commit();
        } catch (IllegalStateException e3) {
            illegalStateException3 = e3;
        }
        Assert.assertTrue("commit() should have failed", illegalStateException3 != null && illegalStateException3.getMessage().contains("has been closed()"));
        TransactionBatch fetchTransactionBatch2 = newConnection.fetchTransactionBatch(2, faultyWriter);
        fetchTransactionBatch2.beginNextTransaction();
        fetchTransactionBatch2.write("name2,2,Welcome to streaming".getBytes());
        fetchTransactionBatch2.write("name4,2,more Streaming unlimited".getBytes());
        fetchTransactionBatch2.write("name5,2,even more Streaming unlimited".getBytes());
        fetchTransactionBatch2.commit();
        Exception exc = null;
        fetchTransactionBatch2.beginNextTransaction();
        faultyWriter.enableErrors();
        try {
            fetchTransactionBatch2.write("name6,2,Doh!".getBytes());
        } catch (StreamingIOFailure e4) {
            exc = e4;
            fetchTransactionBatch2.getCurrentTransactionState();
            fetchTransactionBatch2.getCurrentTxnId();
        }
        Assert.assertTrue("Wrong exception: " + (exc != null ? exc.getMessage() : "?"), exc != null && exc.getMessage().contains("Simulated fault occurred"));
        IllegalStateException illegalStateException4 = null;
        try {
            fetchTransactionBatch2.commit();
        } catch (IllegalStateException e5) {
            illegalStateException4 = e5;
        }
        Assert.assertTrue("commit() should have failed", illegalStateException4 != null && illegalStateException4.getMessage().contains("has been closed()"));
        GetOpenTxnsInfoResponse showTxns2 = this.msClient.showTxns();
        Assert.assertEquals("HWM didn't match", 4L, showTxns2.getTxn_high_water_mark());
        List open_txns2 = showTxns2.getOpen_txns();
        Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ((TxnInfo) open_txns2.get(0)).getState());
        Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ((TxnInfo) open_txns2.get(1)).getState());
        Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ((TxnInfo) open_txns2.get(2)).getState());
        faultyWriter.disableErrors();
        TransactionBatch fetchTransactionBatch3 = newConnection.fetchTransactionBatch(2, faultyWriter);
        fetchTransactionBatch3.beginNextTransaction();
        fetchTransactionBatch3.write("name2,2,Welcome to streaming".getBytes());
        faultyWriter.enableErrors();
        Exception exc2 = null;
        try {
            fetchTransactionBatch3.commit();
        } catch (StreamingIOFailure e6) {
            exc2 = e6;
        }
        Assert.assertTrue("Wrong exception: " + (exc2 != null ? exc2.getMessage() : "?"), exc2 != null && exc2.getMessage().contains("Simulated fault occurred"));
        GetOpenTxnsInfoResponse showTxns3 = this.msClient.showTxns();
        Assert.assertEquals("HWM didn't match", 6L, showTxns3.getTxn_high_water_mark());
        List open_txns3 = showTxns3.getOpen_txns();
        Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ((TxnInfo) open_txns3.get(3)).getState());
        Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ((TxnInfo) open_txns3.get(4)).getState());
        fetchTransactionBatch3.abort();
    }

    private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String str, String str2) throws IOException {
        HashMap<Integer, ArrayList<SampleRec>> hashMap = new HashMap<>();
        for (File file : new File(str + "/" + str2).listFiles()) {
            if (file.getName().startsWith("delta")) {
                for (File file2 : file.listFiles()) {
                    if (!file2.toString().endsWith("length")) {
                        hashMap.put(getBucketNumber(file2), dumpBucket(new Path(file2.toString())));
                    }
                }
            }
        }
        return hashMap;
    }

    private Integer getBucketNumber(File file) {
        String name = file.getName();
        return Integer.valueOf(Integer.parseInt(name.substring(name.indexOf(95) + 1, name.length())));
    }

    public static void dropDB(IMetaStoreClient iMetaStoreClient, String str) {
        try {
            Iterator it = iMetaStoreClient.listTableNamesByFilter(str, "", (short) -1).iterator();
            while (it.hasNext()) {
                iMetaStoreClient.dropTable(str, (String) it.next(), true, true);
            }
            iMetaStoreClient.dropDatabase(str);
        } catch (TException e) {
        }
    }

    public static Path createDbAndTable(Driver driver, String str, String str2, List<String> list, String[] strArr, String[] strArr2, String[] strArr3, String[] strArr4, String str3, int i) throws Exception {
        String str4 = "raw://" + new Path(str3).toUri().toString();
        String str5 = str4 + "/" + str2;
        runDDL(driver, "create database IF NOT EXISTS " + str + " location '" + str4 + "'");
        runDDL(driver, "use " + str);
        runDDL(driver, "create table " + str2 + " ( " + getTableColumnsStr(strArr, strArr2) + " )" + getPartitionStmtStr(strArr4) + " clustered by ( " + join(strArr3, ",") + " ) into " + i + " buckets  stored as orc  location '" + str5 + "' TBLPROPERTIES ('transactional'='true') ");
        return (strArr4 == null || strArr4.length == 0) ? new Path(str5) : addPartition(driver, str2, list, strArr4);
    }

    private static Path addPartition(Driver driver, String str, List<String> list, String[] strArr) throws QueryFailedException, CommandNeedRetryException, IOException {
        String partsSpec = getPartsSpec(strArr, list);
        runDDL(driver, "alter table " + str + " add partition ( " + partsSpec + " )");
        return getPartitionPath(driver, str, partsSpec);
    }

    private static Path getPartitionPath(Driver driver, String str, String str2) throws CommandNeedRetryException, IOException {
        ArrayList<String> queryTable = queryTable(driver, "describe extended " + str + " PARTITION (" + str2 + ")");
        String str3 = queryTable.get(queryTable.size() - 1);
        int indexOf = str3.indexOf("location:") + "location:".length();
        return new Path(str3.substring(indexOf, str3.indexOf(",", indexOf)));
    }

    private static String getTableColumnsStr(String[] strArr, String[] strArr2) {
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < strArr.length; i++) {
            stringBuffer.append(strArr[i] + " " + strArr2[i]);
            if (i < strArr.length - 1) {
                stringBuffer.append(",");
            }
        }
        return stringBuffer.toString();
    }

    private static String getTablePartsStr(String[] strArr) {
        if (strArr == null || strArr.length == 0) {
            return "";
        }
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < strArr.length; i++) {
            stringBuffer.append(strArr[i] + " string");
            if (i < strArr.length - 1) {
                stringBuffer.append(",");
            }
        }
        return stringBuffer.toString();
    }

    private static String getPartsSpec(String[] strArr, List<String> list) {
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < list.size(); i++) {
            stringBuffer.append(strArr[i] + " = '" + list.get(i) + "'");
            if (i < list.size() - 1) {
                stringBuffer.append(",");
            }
        }
        return stringBuffer.toString();
    }

    private static String join(String[] strArr, String str) {
        if (strArr == null) {
            return null;
        }
        StringBuffer stringBuffer = new StringBuffer();
        boolean z = true;
        for (String str2 : strArr) {
            if (z) {
                z = false;
            } else {
                stringBuffer.append(str);
            }
            stringBuffer.append(str2.toString());
        }
        return stringBuffer.toString();
    }

    private static String getPartitionStmtStr(String[] strArr) {
        return (strArr == null || strArr.length == 0) ? "" : " partitioned by (" + getTablePartsStr(strArr) + " )";
    }

    private static boolean runDDL(Driver driver, String str) throws QueryFailedException {
        CommandProcessorResponse run;
        LOG.debug(str);
        System.out.println(str);
        for (int i = 0; i <= 1; i++) {
            try {
                run = driver.run(str);
            } catch (CommandNeedRetryException e) {
                if (i == 1) {
                    throw new QueryFailedException(str, e);
                }
            }
            if (run.getResponseCode() == 0) {
                return true;
            }
            LOG.error("Statement: " + str + " failed: " + run);
        }
        return false;
    }

    public static ArrayList<String> queryTable(Driver driver, String str) throws CommandNeedRetryException, IOException {
        driver.run(str);
        ArrayList<String> arrayList = new ArrayList<>();
        driver.getResults(arrayList);
        if (arrayList.isEmpty()) {
            System.err.println(driver.getErrorMsg());
        }
        return arrayList;
    }
}
