/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.hbase;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.hbase.BatchAware;
import org.apache.flume.sink.hbase.HBaseSinkConfigurationConstants;
import org.apache.flume.sink.hbase.HBaseVersionCheck;
import org.apache.flume.sink.hbase.HbaseEventSerializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseSink
extends AbstractSink
implements Configurable {
    private String tableName;
    private byte[] columnFamily;
    private HTable table;
    private long batchSize;
    private Configuration config;
    private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
    private HbaseEventSerializer serializer;
    private String eventSerializerType;
    private Context serializerContext;
    private String kerberosPrincipal;
    private String kerberosKeytab;
    private boolean enableWal = true;
    private boolean batchIncrements = false;
    private Method refGetFamilyMap = null;
    private SinkCounter sinkCounter;
    private PrivilegedExecutor privilegedExecutor;
    private DebugIncrementsCallback debugIncrCallback = null;

    public HBaseSink() {
        this(HBaseConfiguration.create());
    }

    public HBaseSink(Configuration conf) {
        this.config = conf;
    }

    @VisibleForTesting
    @InterfaceAudience.Private
    HBaseSink(Configuration conf, DebugIncrementsCallback cb) {
        this(conf);
        this.debugIncrCallback = cb;
    }

    public void start() {
        Preconditions.checkArgument((this.table == null ? 1 : 0) != 0, (Object)"Please call stop before calling start on an old instance.");
        try {
            this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator((String)this.kerberosPrincipal, (String)this.kerberosKeytab);
        }
        catch (Exception ex) {
            this.sinkCounter.incrementConnectionFailedCount();
            throw new FlumeException("Failed to login to HBase using provided credentials.", (Throwable)ex);
        }
        try {
            this.table = (HTable)this.privilegedExecutor.execute((PrivilegedExceptionAction)new PrivilegedExceptionAction<HTable>(){

                @Override
                public HTable run() throws Exception {
                    HTable table = new HTable(HBaseSink.this.config, HBaseSink.this.tableName);
                    table.setAutoFlush(false);
                    return table;
                }
            });
        }
        catch (Exception e) {
            this.sinkCounter.incrementConnectionFailedCount();
            logger.error("Could not load table, " + this.tableName + " from HBase", (Throwable)e);
            throw new FlumeException("Could not load table, " + this.tableName + " from HBase", (Throwable)e);
        }
        try {
            if (!((Boolean)this.privilegedExecutor.execute((PrivilegedExceptionAction)new PrivilegedExceptionAction<Boolean>(){

                @Override
                public Boolean run() throws IOException {
                    return HBaseSink.this.table.getTableDescriptor().hasFamily(HBaseSink.this.columnFamily);
                }
            })).booleanValue()) {
                throw new IOException("Table " + this.tableName + " has no such column family " + Bytes.toString((byte[])this.columnFamily));
            }
        }
        catch (Exception e) {
            this.sinkCounter.incrementConnectionFailedCount();
            throw new FlumeException("Error getting column family from HBase.Please verify that the table " + this.tableName + " and Column Family, " + Bytes.toString((byte[])this.columnFamily) + " exists in HBase, and the current user has permissions to access that table.", (Throwable)e);
        }
        super.start();
        this.sinkCounter.incrementConnectionCreatedCount();
        this.sinkCounter.start();
    }

    public void stop() {
        try {
            if (this.table != null) {
                this.table.close();
            }
            this.table = null;
        }
        catch (IOException e) {
            throw new FlumeException("Error closing table.", (Throwable)e);
        }
        this.sinkCounter.incrementConnectionClosedCount();
        this.sinkCounter.stop();
    }

    public void configure(Context context) {
        String hbaseZnode;
        if (!HBaseVersionCheck.hasVersionLessThan2(logger)) {
            throw new ConfigurationException("HBase major version number must be less than 2 for hbase-sink.");
        }
        this.tableName = context.getString("table");
        String cf = context.getString("columnFamily");
        this.batchSize = context.getLong("batchSize", new Long(100L));
        this.serializerContext = new Context();
        this.eventSerializerType = context.getString("serializer");
        Preconditions.checkNotNull((Object)this.tableName, (Object)"Table name cannot be empty, please specify in configuration file");
        Preconditions.checkNotNull((Object)cf, (Object)"Column family cannot be empty, please specify in configuration file");
        if (this.eventSerializerType == null || this.eventSerializerType.isEmpty()) {
            this.eventSerializerType = "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer";
            logger.info("No serializer defined, Will use default");
        }
        this.serializerContext.putAll((Map)context.getSubProperties("serializer."));
        this.columnFamily = cf.getBytes(Charsets.UTF_8);
        try {
            Class<?> clazz = Class.forName(this.eventSerializerType);
            this.serializer = (HbaseEventSerializer)clazz.newInstance();
            this.serializer.configure(this.serializerContext);
        }
        catch (Exception e) {
            logger.error("Could not instantiate event serializer.", (Throwable)e);
            Throwables.propagate((Throwable)e);
        }
        this.kerberosKeytab = context.getString("kerberosKeytab");
        this.kerberosPrincipal = context.getString("kerberosPrincipal");
        this.enableWal = context.getBoolean("enableWal", Boolean.valueOf(true));
        logger.info("The write to WAL option is set to: " + String.valueOf(this.enableWal));
        if (!this.enableWal) {
            logger.warn("HBase Sink's enableWal configuration is set to false. All writes to HBase will have WAL disabled, and any data in the memstore of this region in the Region Server could be lost!");
        }
        this.batchIncrements = context.getBoolean("coalesceIncrements", HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
        if (this.batchIncrements) {
            logger.info("Increment coalescing is enabled. Increments will be buffered.");
            this.refGetFamilyMap = HBaseSink.reflectLookupGetFamilyMap();
        }
        String zkQuorum = context.getString("zookeeperQuorum");
        Integer port = null;
        if (zkQuorum != null && !zkQuorum.isEmpty()) {
            StringBuilder zkBuilder = new StringBuilder();
            logger.info("Using ZK Quorum: " + zkQuorum);
            String[] zkHosts = zkQuorum.split(",");
            int length = zkHosts.length;
            for (int i = 0; i < length; ++i) {
                String[] zkHostAndPort = zkHosts[i].split(":");
                zkBuilder.append(zkHostAndPort[0].trim());
                if (i != length - 1) {
                    zkBuilder.append(",");
                } else {
                    zkQuorum = zkBuilder.toString();
                }
                if (zkHostAndPort[1] == null) {
                    throw new FlumeException("Expected client port for the ZK node!");
                }
                if (port == null) {
                    port = Integer.parseInt(zkHostAndPort[1].trim());
                    continue;
                }
                if (port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) continue;
                throw new FlumeException("All Zookeeper nodes in the quorum must use the same client port.");
            }
            if (port == null) {
                port = 5181;
            }
            this.config.set("hbase.zookeeper.quorum", zkQuorum);
            this.config.setInt("hbase.zookeeper.property.clientPort", port.intValue());
        }
        if ((hbaseZnode = context.getString("znodeParent")) != null && !hbaseZnode.isEmpty()) {
            this.config.set("zookeeper.znode.parent", hbaseZnode);
        }
        this.sinkCounter = new SinkCounter(this.getName());
    }

    public Configuration getConfig() {
        return this.config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status;
        block13: {
            status = Sink.Status.READY;
            Channel channel = this.getChannel();
            Transaction txn = channel.getTransaction();
            LinkedList<Row> actions = new LinkedList<Row>();
            LinkedList<Increment> incs = new LinkedList<Increment>();
            try {
                long i;
                txn.begin();
                if (this.serializer instanceof BatchAware) {
                    ((BatchAware)((Object)this.serializer)).onBatchStart();
                }
                for (i = 0L; i < this.batchSize; ++i) {
                    Event event = channel.take();
                    if (event == null) {
                        if (i == 0L) {
                            status = Sink.Status.BACKOFF;
                            this.sinkCounter.incrementBatchEmptyCount();
                            break;
                        }
                        this.sinkCounter.incrementBatchUnderflowCount();
                        break;
                    }
                    this.serializer.initialize(event, this.columnFamily);
                    actions.addAll(this.serializer.getActions());
                    incs.addAll(this.serializer.getIncrements());
                }
                if (i == this.batchSize) {
                    this.sinkCounter.incrementBatchCompleteCount();
                }
                this.sinkCounter.addToEventDrainAttemptCount(i);
                this.putEventsAndCommit(actions, incs, txn);
            }
            catch (Throwable e) {
                try {
                    txn.rollback();
                }
                catch (Exception e2) {
                    logger.error("Exception in rollback. Rollback might not have been successful.", (Throwable)e2);
                }
                logger.error("Failed to commit transaction.Transaction rolled back.", e);
                if (e instanceof Error || e instanceof RuntimeException) {
                    logger.error("Failed to commit transaction.Transaction rolled back.", e);
                    Throwables.propagate((Throwable)e);
                    break block13;
                }
                logger.error("Failed to commit transaction.Transaction rolled back.", e);
                throw new EventDeliveryException("Failed to commit transaction.Transaction rolled back.", e);
            }
            finally {
                txn.close();
            }
        }
        return status;
    }

    private void putEventsAndCommit(final List<Row> actions, final List<Increment> incs, Transaction txn) throws Exception {
        this.privilegedExecutor.execute((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws Exception {
                for (Row r : actions) {
                    if (r instanceof Put) {
                        ((Put)r).setWriteToWAL(HBaseSink.this.enableWal);
                    }
                    if (!(r instanceof Increment)) continue;
                    ((Increment)r).setWriteToWAL(HBaseSink.this.enableWal);
                }
                HBaseSink.this.table.batch(actions);
                return null;
            }
        });
        this.privilegedExecutor.execute((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws Exception {
                List processedIncrements = HBaseSink.this.batchIncrements ? HBaseSink.this.coalesceIncrements(incs) : incs;
                if (HBaseSink.this.debugIncrCallback != null) {
                    HBaseSink.this.debugIncrCallback.onAfterCoalesce(processedIncrements);
                }
                for (Increment i : processedIncrements) {
                    i.setWriteToWAL(HBaseSink.this.enableWal);
                    HBaseSink.this.table.increment(i);
                }
                return null;
            }
        });
        txn.commit();
        this.sinkCounter.addToEventDrainSuccessCount((long)actions.size());
    }

    @VisibleForTesting
    static Method reflectLookupGetFamilyMap() {
        String[] methodNames;
        Method m = null;
        for (String methodName : methodNames = new String[]{"getFamilyMapOfLongs", "getFamilyMap"}) {
            try {
                m = Increment.class.getMethod(methodName, new Class[0]);
                if (m == null || !m.getReturnType().equals(Map.class)) continue;
                logger.debug("Using Increment.{} for coalesce", (Object)methodName);
                break;
            }
            catch (NoSuchMethodException e) {
                logger.debug("Increment.{} does not exist. Exception follows.", (Object)methodName, (Object)e);
            }
            catch (SecurityException e) {
                logger.debug("No access to Increment.{}; Exception follows.", (Object)methodName, (Object)e);
            }
        }
        if (m == null) {
            throw new UnsupportedOperationException("Cannot find Increment.getFamilyMap()");
        }
        return m;
    }

    private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) {
        Preconditions.checkNotNull((Object)this.refGetFamilyMap, (Object)"Increment.getFamilymap() not found");
        Preconditions.checkNotNull((Object)inc, (Object)"Increment required");
        Map familyMap = null;
        try {
            Object familyObj = this.refGetFamilyMap.invoke((Object)inc, new Object[0]);
            familyMap = (Map)familyObj;
        }
        catch (IllegalAccessException e) {
            logger.warn("Unexpected error calling getFamilyMap()", (Throwable)e);
            Throwables.propagate((Throwable)e);
        }
        catch (InvocationTargetException e) {
            logger.warn("Unexpected error calling getFamilyMap()", (Throwable)e);
            Throwables.propagate((Throwable)e);
        }
        return familyMap;
    }

    private List<Increment> coalesceIncrements(Iterable<Increment> incs) {
        Preconditions.checkNotNull(incs, (Object)"List of Increments must not be null");
        TreeMap counters = Maps.newTreeMap((Comparator)Bytes.BYTES_COMPARATOR);
        for (Increment inc : incs) {
            byte[] row = inc.getRow();
            Map<byte[], NavigableMap<byte[], Long>> families = this.getFamilyMap(inc);
            for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
                byte[] family = familyEntry.getKey();
                NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
                for (Map.Entry qualifierEntry : qualifiers.entrySet()) {
                    byte[] qualifier = (byte[])qualifierEntry.getKey();
                    Long count = (Long)qualifierEntry.getValue();
                    this.incrementCounter(counters, row, family, qualifier, count);
                }
            }
        }
        LinkedList coalesced = Lists.newLinkedList();
        for (Map.Entry rowEntry : counters.entrySet()) {
            byte[] row = (byte[])rowEntry.getKey();
            Map families = (Map)rowEntry.getValue();
            Increment inc = new Increment(row);
            for (Map.Entry familyEntry : families.entrySet()) {
                byte[] family = (byte[])familyEntry.getKey();
                NavigableMap qualifiers = (NavigableMap)familyEntry.getValue();
                for (Map.Entry qualifierEntry : qualifiers.entrySet()) {
                    byte[] qualifier = (byte[])qualifierEntry.getKey();
                    long count = (Long)qualifierEntry.getValue();
                    inc.addColumn(family, qualifier, count);
                }
            }
            coalesced.add(inc);
        }
        return coalesced;
    }

    private void incrementCounter(Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters, byte[] row, byte[] family, byte[] qualifier, Long count) {
        Long existingValue;
        NavigableMap qualifiers;
        TreeMap families = counters.get(row);
        if (families == null) {
            families = Maps.newTreeMap((Comparator)Bytes.BYTES_COMPARATOR);
            counters.put(row, families);
        }
        if ((qualifiers = (NavigableMap)families.get(family)) == null) {
            qualifiers = Maps.newTreeMap((Comparator)Bytes.BYTES_COMPARATOR);
            families.put(family, qualifiers);
        }
        if ((existingValue = (Long)qualifiers.get(qualifier)) == null) {
            qualifiers.put(qualifier, count);
        } else {
            qualifiers.put(qualifier, existingValue + count);
        }
    }

    @VisibleForTesting
    @InterfaceAudience.Private
    HbaseEventSerializer getSerializer() {
        return this.serializer;
    }

    @VisibleForTesting
    @InterfaceAudience.Private
    static interface DebugIncrementsCallback {
        public void onAfterCoalesce(Iterable<Increment> var1);
    }
}

