package org.apache.hive.hcatalog.streaming.mutate;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert;
import org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils;
import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient;
import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder;
import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver;
import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator;
import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/hive/hcatalog/streaming/mutate/TestMutations.class */
public class TestMutations {
    private static final List<String> EUROPE_FRANCE = Arrays.asList("Europe", "France");
    private static final List<String> EUROPE_UK = Arrays.asList("Europe", "UK");
    private static final List<String> ASIA_INDIA = Arrays.asList("Asia", "India");
    private static final int[] BUCKET_COLUMN_INDEXES = {0};
    private static final int RECORD_ID_COLUMN = 2;
    private IMetaStoreClient metaStoreClient;
    private Database database;
    private StreamingTestUtils.TableBuilder partitionedTableBuilder;
    private StreamingTestUtils.TableBuilder unpartitionedTableBuilder;
    private StreamingAssert.Factory assertionFactory;

    @Rule
    public TemporaryFolder warehouseFolder = new TemporaryFolder();
    private StreamingTestUtils testUtils = new StreamingTestUtils();
    private String metaStoreUri;
    private HiveConf conf = this.testUtils.newHiveConf(this.metaStoreUri);

    public TestMutations() throws Exception {
        this.testUtils.prepareTransactionDatabase(this.conf);
        this.metaStoreClient = this.testUtils.newMetaStoreClient(this.conf);
        this.assertionFactory = new StreamingAssert.Factory(this.metaStoreClient, this.conf);
    }

    @Before
    public void setup() throws Exception {
        this.database = StreamingTestUtils.databaseBuilder(this.warehouseFolder.getRoot()).name("testing").dropAndCreate(this.metaStoreClient);
        this.partitionedTableBuilder = StreamingTestUtils.tableBuilder(this.database).name("partitioned").addColumn("id", "int").addColumn("msg", "string").partitionKeys("continent", "country").bucketCols(Collections.singletonList("string"));
        this.unpartitionedTableBuilder = StreamingTestUtils.tableBuilder(this.database).name("unpartitioned").addColumn("id", "int").addColumn("msg", "string").bucketCols(Collections.singletonList("string"));
    }

    @Test
    public void testTransactionBatchEmptyCommitPartitioned() throws Exception {
        Table create = this.partitionedTableBuilder.addPartition(ASIA_INDIA).create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), true).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        newTransaction.begin();
        newTransaction.commit();
        Assert.assertThat(newTransaction.getState(), CoreMatchers.is(TransactionBatch.TxnState.COMMITTED));
        build.close();
    }

    @Test
    public void testTransactionBatchEmptyCommitUnpartitioned() throws Exception {
        Table create = this.unpartitionedTableBuilder.create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), false).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        newTransaction.begin();
        newTransaction.commit();
        Assert.assertThat(newTransaction.getState(), CoreMatchers.is(TransactionBatch.TxnState.COMMITTED));
        build.close();
    }

    @Test
    public void testTransactionBatchEmptyAbortPartitioned() throws Exception {
        Table create = this.partitionedTableBuilder.addPartition(ASIA_INDIA).create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), true).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        List tables = build.getTables();
        newTransaction.begin();
        new MutatorCoordinatorBuilder().metaStoreUri(this.metaStoreUri).table((AcidTable) tables.get(0)).mutatorFactory(new ReflectiveMutatorFactory(this.conf, MutableRecord.class, RECORD_ID_COLUMN, BUCKET_COLUMN_INDEXES)).build().close();
        newTransaction.abort();
        Assert.assertThat(newTransaction.getState(), CoreMatchers.is(TransactionBatch.TxnState.ABORTED));
        build.close();
    }

    @Test
    public void testTransactionBatchEmptyAbortUnartitioned() throws Exception {
        Table create = this.unpartitionedTableBuilder.create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), false).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        List tables = build.getTables();
        newTransaction.begin();
        new MutatorCoordinatorBuilder().metaStoreUri(this.metaStoreUri).table((AcidTable) tables.get(0)).mutatorFactory(new ReflectiveMutatorFactory(this.conf, MutableRecord.class, RECORD_ID_COLUMN, BUCKET_COLUMN_INDEXES)).build().close();
        newTransaction.abort();
        Assert.assertThat(newTransaction.getState(), CoreMatchers.is(TransactionBatch.TxnState.ABORTED));
        build.close();
    }

    @Test
    public void testTransactionBatchCommitPartitioned() throws Exception {
        Table create = this.partitionedTableBuilder.addPartition(ASIA_INDIA).create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), true).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        List tables = build.getTables();
        newTransaction.begin();
        ReflectiveMutatorFactory reflectiveMutatorFactory = new ReflectiveMutatorFactory(this.conf, MutableRecord.class, RECORD_ID_COLUMN, BUCKET_COLUMN_INDEXES);
        MutatorCoordinator build2 = new MutatorCoordinatorBuilder().metaStoreUri(this.metaStoreUri).table((AcidTable) tables.get(0)).mutatorFactory(reflectiveMutatorFactory).build();
        build2.insert(ASIA_INDIA, (MutableRecord) reflectiveMutatorFactory.newBucketIdResolver(((AcidTable) tables.get(0)).getTotalBuckets()).attachBucketIdToRecord(new MutableRecord(1, "Hello streaming")));
        build2.close();
        newTransaction.commit();
        StreamingAssert newStreamingAssert = this.assertionFactory.newStreamingAssert(create, ASIA_INDIA);
        newStreamingAssert.assertMinTransactionId(1L);
        newStreamingAssert.assertMaxTransactionId(1L);
        newStreamingAssert.assertExpectedFileCount(1);
        List<StreamingAssert.Record> readRecords = newStreamingAssert.readRecords();
        Assert.assertThat(Integer.valueOf(readRecords.size()), CoreMatchers.is(1));
        Assert.assertThat(readRecords.get(0).getRow(), CoreMatchers.is("{1, Hello streaming}"));
        Assert.assertThat(readRecords.get(0).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 0L)));
        Assert.assertThat(newTransaction.getState(), CoreMatchers.is(TransactionBatch.TxnState.COMMITTED));
        build.close();
    }

    @Test
    public void testMulti() throws Exception {
        Table create = this.partitionedTableBuilder.addPartition(ASIA_INDIA).create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), true).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        List tables = build.getTables();
        newTransaction.begin();
        ReflectiveMutatorFactory reflectiveMutatorFactory = new ReflectiveMutatorFactory(this.conf, MutableRecord.class, RECORD_ID_COLUMN, BUCKET_COLUMN_INDEXES);
        MutatorCoordinator build2 = new MutatorCoordinatorBuilder().metaStoreUri(this.metaStoreUri).table((AcidTable) tables.get(0)).mutatorFactory(reflectiveMutatorFactory).build();
        BucketIdResolver newBucketIdResolver = reflectiveMutatorFactory.newBucketIdResolver(((AcidTable) tables.get(0)).getTotalBuckets());
        MutableRecord mutableRecord = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(1, "Hello streaming"));
        MutableRecord mutableRecord2 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(RECORD_ID_COLUMN, "Hello streaming"));
        MutableRecord mutableRecord3 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(3, "Hello streaming"));
        MutableRecord mutableRecord4 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(4, "Bonjour streaming"));
        build2.insert(ASIA_INDIA, mutableRecord);
        build2.insert(EUROPE_UK, mutableRecord2);
        build2.insert(EUROPE_FRANCE, mutableRecord3);
        build2.insert(EUROPE_FRANCE, mutableRecord4);
        build2.close();
        newTransaction.commit();
        StreamingAssert newStreamingAssert = this.assertionFactory.newStreamingAssert(create, ASIA_INDIA);
        newStreamingAssert.assertMinTransactionId(1L);
        newStreamingAssert.assertMaxTransactionId(1L);
        newStreamingAssert.assertExpectedFileCount(1);
        List<StreamingAssert.Record> readRecords = newStreamingAssert.readRecords();
        Assert.assertThat(Integer.valueOf(readRecords.size()), CoreMatchers.is(1));
        Assert.assertThat(readRecords.get(0).getRow(), CoreMatchers.is("{1, Hello streaming}"));
        Assert.assertThat(readRecords.get(0).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 0L)));
        StreamingAssert newStreamingAssert2 = this.assertionFactory.newStreamingAssert(create, EUROPE_UK);
        newStreamingAssert2.assertMinTransactionId(1L);
        newStreamingAssert2.assertMaxTransactionId(1L);
        newStreamingAssert2.assertExpectedFileCount(1);
        List<StreamingAssert.Record> readRecords2 = newStreamingAssert2.readRecords();
        Assert.assertThat(Integer.valueOf(readRecords2.size()), CoreMatchers.is(1));
        Assert.assertThat(readRecords2.get(0).getRow(), CoreMatchers.is("{2, Hello streaming}"));
        Assert.assertThat(readRecords2.get(0).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 0L)));
        StreamingAssert newStreamingAssert3 = this.assertionFactory.newStreamingAssert(create, EUROPE_FRANCE);
        newStreamingAssert3.assertMinTransactionId(1L);
        newStreamingAssert3.assertMaxTransactionId(1L);
        newStreamingAssert3.assertExpectedFileCount(1);
        List<StreamingAssert.Record> readRecords3 = newStreamingAssert3.readRecords();
        Assert.assertThat(Integer.valueOf(readRecords3.size()), CoreMatchers.is(Integer.valueOf(RECORD_ID_COLUMN)));
        Assert.assertThat(readRecords3.get(0).getRow(), CoreMatchers.is("{3, Hello streaming}"));
        Assert.assertThat(readRecords3.get(0).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 0L)));
        Assert.assertThat(readRecords3.get(1).getRow(), CoreMatchers.is("{4, Bonjour streaming}"));
        Assert.assertThat(readRecords3.get(1).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 1L)));
        build.close();
    }

    @Test
    public void testTransactionBatchCommitUnpartitioned() throws Exception {
        Table create = this.unpartitionedTableBuilder.create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), false).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        List tables = build.getTables();
        newTransaction.begin();
        ReflectiveMutatorFactory reflectiveMutatorFactory = new ReflectiveMutatorFactory(this.conf, MutableRecord.class, RECORD_ID_COLUMN, BUCKET_COLUMN_INDEXES);
        MutatorCoordinator build2 = new MutatorCoordinatorBuilder().metaStoreUri(this.metaStoreUri).table((AcidTable) tables.get(0)).mutatorFactory(reflectiveMutatorFactory).build();
        build2.insert(Collections.emptyList(), (MutableRecord) reflectiveMutatorFactory.newBucketIdResolver(((AcidTable) tables.get(0)).getTotalBuckets()).attachBucketIdToRecord(new MutableRecord(1, "Hello streaming")));
        build2.close();
        newTransaction.commit();
        StreamingAssert newStreamingAssert = this.assertionFactory.newStreamingAssert(create);
        newStreamingAssert.assertMinTransactionId(1L);
        newStreamingAssert.assertMaxTransactionId(1L);
        newStreamingAssert.assertExpectedFileCount(1);
        List<StreamingAssert.Record> readRecords = newStreamingAssert.readRecords();
        Assert.assertThat(Integer.valueOf(readRecords.size()), CoreMatchers.is(1));
        Assert.assertThat(readRecords.get(0).getRow(), CoreMatchers.is("{1, Hello streaming}"));
        Assert.assertThat(readRecords.get(0).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 0L)));
        Assert.assertThat(newTransaction.getState(), CoreMatchers.is(TransactionBatch.TxnState.COMMITTED));
        build.close();
    }

    @Test
    public void testTransactionBatchAbort() throws Exception {
        Table create = this.partitionedTableBuilder.addPartition(ASIA_INDIA).create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), true).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        List tables = build.getTables();
        newTransaction.begin();
        ReflectiveMutatorFactory reflectiveMutatorFactory = new ReflectiveMutatorFactory(this.conf, MutableRecord.class, RECORD_ID_COLUMN, BUCKET_COLUMN_INDEXES);
        MutatorCoordinator build2 = new MutatorCoordinatorBuilder().metaStoreUri(this.metaStoreUri).table((AcidTable) tables.get(0)).mutatorFactory(reflectiveMutatorFactory).build();
        BucketIdResolver newBucketIdResolver = reflectiveMutatorFactory.newBucketIdResolver(((AcidTable) tables.get(0)).getTotalBuckets());
        MutableRecord mutableRecord = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(1, "Hello streaming"));
        MutableRecord mutableRecord2 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(RECORD_ID_COLUMN, "Welcome to streaming"));
        build2.insert(ASIA_INDIA, mutableRecord);
        build2.insert(ASIA_INDIA, mutableRecord2);
        build2.close();
        newTransaction.abort();
        Assert.assertThat(newTransaction.getState(), CoreMatchers.is(TransactionBatch.TxnState.ABORTED));
        build.close();
        this.assertionFactory.newStreamingAssert(create, ASIA_INDIA).assertNothingWritten();
    }

    @Test
    public void testUpdatesAndDeletes() throws Exception {
        ReflectiveMutatorFactory reflectiveMutatorFactory = new ReflectiveMutatorFactory(this.conf, MutableRecord.class, RECORD_ID_COLUMN, BUCKET_COLUMN_INDEXES);
        Table create = this.partitionedTableBuilder.addPartition(ASIA_INDIA).addPartition(EUROPE_FRANCE).create(this.metaStoreClient);
        MutatorClient build = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), true).metaStoreUri(this.metaStoreUri).build();
        build.connect();
        Transaction newTransaction = build.newTransaction();
        List tables = build.getTables();
        newTransaction.begin();
        MutatorCoordinator build2 = new MutatorCoordinatorBuilder().metaStoreUri(this.metaStoreUri).table((AcidTable) tables.get(0)).mutatorFactory(reflectiveMutatorFactory).build();
        BucketIdResolver newBucketIdResolver = reflectiveMutatorFactory.newBucketIdResolver(((AcidTable) tables.get(0)).getTotalBuckets());
        MutableRecord mutableRecord = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(1, "Namaste streaming 1"));
        MutableRecord mutableRecord2 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(RECORD_ID_COLUMN, "Namaste streaming 2"));
        MutableRecord mutableRecord3 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(3, "Hello streaming 1"));
        MutableRecord mutableRecord4 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(4, "Hello streaming 2"));
        MutableRecord mutableRecord5 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(5, "Bonjour streaming 1"));
        MutableRecord mutableRecord6 = (MutableRecord) newBucketIdResolver.attachBucketIdToRecord(new MutableRecord(6, "Bonjour streaming 2"));
        build2.insert(ASIA_INDIA, mutableRecord);
        build2.insert(ASIA_INDIA, mutableRecord2);
        build2.insert(EUROPE_UK, mutableRecord3);
        build2.insert(EUROPE_UK, mutableRecord4);
        build2.insert(EUROPE_FRANCE, mutableRecord5);
        build2.insert(EUROPE_FRANCE, mutableRecord6);
        build2.close();
        newTransaction.commit();
        Assert.assertThat(newTransaction.getState(), CoreMatchers.is(TransactionBatch.TxnState.COMMITTED));
        build.close();
        MutatorClient build3 = new MutatorClientBuilder().addSinkTable(create.getDbName(), create.getTableName(), true).metaStoreUri(this.metaStoreUri).build();
        build3.connect();
        Transaction newTransaction2 = build3.newTransaction();
        List tables2 = build3.getTables();
        newTransaction2.begin();
        MutatorCoordinator build4 = new MutatorCoordinatorBuilder().metaStoreUri(this.metaStoreUri).table((AcidTable) tables2.get(0)).mutatorFactory(reflectiveMutatorFactory).build();
        MutableRecord mutableRecord7 = (MutableRecord) reflectiveMutatorFactory.newBucketIdResolver(((AcidTable) tables2.get(0)).getTotalBuckets()).attachBucketIdToRecord(new MutableRecord(20, "Namaste streaming 3"));
        build4.update(ASIA_INDIA, new MutableRecord(RECORD_ID_COLUMN, "UPDATED: Namaste streaming 2", new RecordIdentifier(1L, 0, 1L)));
        build4.insert(ASIA_INDIA, mutableRecord7);
        build4.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, 0, 0L)));
        build4.delete(EUROPE_FRANCE, new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 0, 0L)));
        build4.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: Bonjour streaming 2", new RecordIdentifier(1L, 0, 1L)));
        build4.close();
        newTransaction2.commit();
        Assert.assertThat(newTransaction2.getState(), CoreMatchers.is(TransactionBatch.TxnState.COMMITTED));
        StreamingAssert newStreamingAssert = this.assertionFactory.newStreamingAssert(create, ASIA_INDIA);
        newStreamingAssert.assertMinTransactionId(1L);
        newStreamingAssert.assertMaxTransactionId(2L);
        List<StreamingAssert.Record> readRecords = newStreamingAssert.readRecords();
        Assert.assertThat(Integer.valueOf(readRecords.size()), CoreMatchers.is(3));
        Assert.assertThat(readRecords.get(0).getRow(), CoreMatchers.is("{1, Namaste streaming 1}"));
        Assert.assertThat(readRecords.get(0).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 0L)));
        Assert.assertThat(readRecords.get(1).getRow(), CoreMatchers.is("{2, UPDATED: Namaste streaming 2}"));
        Assert.assertThat(readRecords.get(1).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 1L)));
        Assert.assertThat(readRecords.get(RECORD_ID_COLUMN).getRow(), CoreMatchers.is("{20, Namaste streaming 3}"));
        Assert.assertThat(readRecords.get(RECORD_ID_COLUMN).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(2L, 0, 0L)));
        StreamingAssert newStreamingAssert2 = this.assertionFactory.newStreamingAssert(create, EUROPE_UK);
        newStreamingAssert2.assertMinTransactionId(1L);
        newStreamingAssert2.assertMaxTransactionId(2L);
        List<StreamingAssert.Record> readRecords2 = newStreamingAssert2.readRecords();
        Assert.assertThat(Integer.valueOf(readRecords2.size()), CoreMatchers.is(1));
        Assert.assertThat(readRecords2.get(0).getRow(), CoreMatchers.is("{4, Hello streaming 2}"));
        Assert.assertThat(readRecords2.get(0).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 1L)));
        StreamingAssert newStreamingAssert3 = this.assertionFactory.newStreamingAssert(create, EUROPE_FRANCE);
        newStreamingAssert3.assertMinTransactionId(1L);
        newStreamingAssert3.assertMaxTransactionId(2L);
        List<StreamingAssert.Record> readRecords3 = newStreamingAssert3.readRecords();
        Assert.assertThat(Integer.valueOf(readRecords3.size()), CoreMatchers.is(1));
        Assert.assertThat(readRecords3.get(0).getRow(), CoreMatchers.is("{6, UPDATED: Bonjour streaming 2}"));
        Assert.assertThat(readRecords3.get(0).getRecordIdentifier(), CoreMatchers.is(new RecordIdentifier(1L, 0, 1L)));
        build3.close();
    }
}
