/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSink;
import org.apache.hive.org.apache.commons.lang.StringUtils;
import org.apache.hive.org.apache.commons.logging.Log;
import org.apache.hive.org.apache.commons.logging.LogFactory;

@InterfaceAudience.Private
public class ReplicationSink {
    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
    private final Configuration conf;
    private final Connection sharedHtableCon;
    private final MetricsSink metrics;
    private final AtomicLong totalReplicatedEdits = new AtomicLong();

    public ReplicationSink(Configuration conf, Stoppable stopper) throws IOException {
        this.conf = HBaseConfiguration.create(conf);
        this.decorateConf();
        this.metrics = new MetricsSink();
        this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
    }

    private void decorateConf() {
        this.conf.setInt("hbase.client.retries.number", this.conf.getInt("replication.sink.client.retries.number", 4));
        this.conf.setInt("hbase.client.operation.timeout", this.conf.getInt("replication.sink.client.ops.timeout", 10000));
        String replicationCodec = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty(replicationCodec)) {
            this.conf.set("hbase.client.rpc.codec", replicationCodec);
        }
    }

    public void replicateEntries(List<AdminProtos.WALEntry> entries, CellScanner cells) throws IOException {
        if (entries.isEmpty()) {
            return;
        }
        if (cells == null) {
            throw new NullPointerException("TODO: Add handling of null CellScanner");
        }
        try {
            long totalReplicated = 0L;
            TreeMap rowMap = new TreeMap();
            for (AdminProtos.WALEntry wALEntry : entries) {
                TableName table = TableName.valueOf(wALEntry.getKey().getTableName().toByteArray());
                Cell previousCell = null;
                Mutation m = null;
                int count = wALEntry.getAssociatedCellCount();
                for (int i = 0; i < count; ++i) {
                    if (!cells.advance()) {
                        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
                    }
                    Cell cell = cells.current();
                    if (this.isNewRowOrType(previousCell, cell)) {
                        m = CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                        ArrayList<UUID> clusterIds = new ArrayList<UUID>();
                        for (HBaseProtos.UUID clusterId : wALEntry.getKey().getClusterIdsList()) {
                            clusterIds.add(this.toUUID(clusterId));
                        }
                        m.setClusterIds(clusterIds);
                        this.addToHashMultiMap(rowMap, table, clusterIds, m);
                    }
                    if (CellUtil.isDelete(cell)) {
                        ((Delete)m).addDeleteMarker(cell);
                    } else {
                        ((Put)m).add(cell);
                    }
                    previousCell = cell;
                }
                ++totalReplicated;
            }
            for (Map.Entry entry : rowMap.entrySet()) {
                this.batch((TableName)entry.getKey(), ((Map)entry.getValue()).values());
            }
            int size = entries.size();
            this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
            this.metrics.applyBatch(size);
            this.totalReplicatedEdits.addAndGet(totalReplicated);
        }
        catch (IOException ex) {
            LOG.error("Unable to accept edit because:", ex);
            throw ex;
        }
    }

    private boolean isNewRowOrType(Cell previousCell, Cell cell) {
        return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || !CellUtil.matchingRow(previousCell, cell);
    }

    private UUID toUUID(HBaseProtos.UUID uuid) {
        return new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
    }

    private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, V value) {
        List<V> values;
        Map<K2, List<List<V>>> innerMap = map.get(key1);
        if (innerMap == null) {
            innerMap = new HashMap<K2, List<V>>();
            map.put(key1, innerMap);
        }
        if ((values = innerMap.get(key2)) == null) {
            values = new ArrayList<V>();
            innerMap.put(key2, values);
        }
        values.add(value);
        return values;
    }

    public void stopReplicationSinkServices() {
        try {
            this.sharedHtableCon.close();
        }
        catch (IOException e) {
            LOG.warn("IOException while closing the connection", e);
        }
    }

    protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
        if (allRows.isEmpty()) {
            return;
        }
        try (Table table = null;){
            table = this.sharedHtableCon.getTable(tableName);
            for (List<Row> rows : allRows) {
                table.batch(rows);
            }
        }
    }

    public String getStats() {
        return this.totalReplicatedEdits.get() == 0L ? "" : "Sink: age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + ", total replicated edits: " + this.totalReplicatedEdits;
    }

    public MetricsSink getSinkMetrics() {
        return this.metrics;
    }
}

