package org.apache.flume.sink.hbase;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.primitives.UnsignedBytes;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.stumbleupon.async.Callback;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.PutRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/hbase/AsyncHBaseSink.class */
public class AsyncHBaseSink extends AbstractSink implements Configurable {
    private String tableName;
    private byte[] columnFamily;
    private long batchSize;
    private static final Logger logger = LoggerFactory.getLogger(AsyncHBaseSink.class);
    private AsyncHbaseEventSerializer serializer;
    private String eventSerializerType;
    private Context serializerContext;
    private HBaseClient client;
    private Configuration conf;
    private Transaction txn;
    private volatile boolean open;
    private SinkCounter sinkCounter;
    private long timeout;
    private String zkQuorum;
    private String zkBaseDir;
    private ExecutorService sinkCallbackPool;
    private boolean isTimeoutTest;
    private boolean isCoalesceTest;
    private boolean enableWal;
    private boolean batchIncrements;
    private volatile int totalCallbacksReceived;
    private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer;
    private final Comparator<byte[]> COMPARATOR;

    /* loaded from: input_file:org/apache/flume/sink/hbase/AsyncHBaseSink$CellIdentifier.class */
    private class CellIdentifier {
        private final byte[] row;
        private final byte[] column;
        private final int hashCode;

        public CellIdentifier(byte[] bArr, byte[] bArr2) {
            this.row = bArr;
            this.column = bArr2;
            this.hashCode = Arrays.hashCode(bArr) * 31 * Arrays.hashCode(bArr2) * 31;
        }

        public int hashCode() {
            return this.hashCode;
        }

        public boolean equals(Object obj) {
            CellIdentifier cellIdentifier = (CellIdentifier) obj;
            return obj != null && AsyncHBaseSink.this.COMPARATOR.compare(this.row, cellIdentifier.row) == 0 && AsyncHBaseSink.this.COMPARATOR.compare(this.column, cellIdentifier.column) == 0;
        }
    }

    /* loaded from: input_file:org/apache/flume/sink/hbase/AsyncHBaseSink$FailureCallback.class */
    private class FailureCallback<R, T> implements Callback<R, T> {
        private Lock lock;
        private AtomicInteger callbacksReceived;
        private AtomicBoolean txnFail;
        private Condition condition;
        private final boolean isTimeoutTesting;

        public FailureCallback(Lock lock, AtomicInteger atomicInteger, AtomicBoolean atomicBoolean, Condition condition) {
            this.lock = lock;
            this.callbacksReceived = atomicInteger;
            this.txnFail = atomicBoolean;
            this.condition = condition;
            this.isTimeoutTesting = AsyncHBaseSink.this.isTimeoutTest;
        }

        public R call(T t) throws Exception {
            if (this.isTimeoutTesting) {
                try {
                    TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4L));
                } catch (InterruptedException e) {
                }
            }
            doCall();
            return null;
        }

        private void doCall() throws Exception {
            this.callbacksReceived.incrementAndGet();
            this.txnFail.set(true);
            this.lock.lock();
            try {
                this.condition.signal();
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/flume/sink/hbase/AsyncHBaseSink$SuccessCallback.class */
    private class SuccessCallback<R, T> implements Callback<R, T> {
        private Lock lock;
        private AtomicInteger callbacksReceived;
        private Condition condition;
        private final boolean isTimeoutTesting;

        public SuccessCallback(Lock lock, AtomicInteger atomicInteger, Condition condition) {
            this.lock = lock;
            this.callbacksReceived = atomicInteger;
            this.condition = condition;
            this.isTimeoutTesting = AsyncHBaseSink.this.isTimeoutTest;
        }

        public R call(T t) throws Exception {
            if (this.isTimeoutTesting) {
                try {
                    TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4L));
                } catch (InterruptedException e) {
                }
            }
            doCall();
            return null;
        }

        private void doCall() throws Exception {
            this.callbacksReceived.incrementAndGet();
            this.lock.lock();
            try {
                this.condition.signal();
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
    }

    public AsyncHBaseSink() {
        this(null);
    }

    public AsyncHBaseSink(Configuration configuration) {
        this(configuration, false, false);
    }

    @VisibleForTesting
    AsyncHBaseSink(Configuration configuration, boolean z, boolean z2) {
        this.open = false;
        this.enableWal = true;
        this.batchIncrements = false;
        this.totalCallbacksReceived = 0;
        this.COMPARATOR = UnsignedBytes.lexicographicalComparator();
        this.conf = configuration;
        this.isTimeoutTest = z;
        this.isCoalesceTest = z2;
    }

    public Sink.Status process() throws EventDeliveryException {
        if (!this.open) {
            throw new EventDeliveryException("Sink was never opened. Please fix the configuration.");
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition newCondition = reentrantLock.newCondition();
        if (this.incrementBuffer != null) {
            this.incrementBuffer.clear();
        }
        SuccessCallback successCallback = new SuccessCallback(reentrantLock, atomicInteger, newCondition);
        FailureCallback failureCallback = new FailureCallback(reentrantLock, atomicInteger, atomicBoolean, newCondition);
        SuccessCallback successCallback2 = new SuccessCallback(reentrantLock, atomicInteger, newCondition);
        FailureCallback failureCallback2 = new FailureCallback(reentrantLock, atomicInteger, atomicBoolean, newCondition);
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        int i = 0;
        try {
            this.txn = channel.getTransaction();
            this.txn.begin();
            while (true) {
                if (i >= this.batchSize) {
                    break;
                }
                Event take = channel.take();
                if (take == null) {
                    status = Sink.Status.BACKOFF;
                    if (i == 0) {
                        this.sinkCounter.incrementBatchEmptyCount();
                    } else {
                        this.sinkCounter.incrementBatchUnderflowCount();
                    }
                } else {
                    this.serializer.setEvent(take);
                    List<PutRequest> actions = this.serializer.getActions();
                    List<AtomicIncrementRequest> increments = this.serializer.getIncrements();
                    atomicInteger2.addAndGet(actions.size());
                    if (!this.batchIncrements) {
                        atomicInteger2.addAndGet(increments.size());
                    }
                    for (PutRequest putRequest : actions) {
                        putRequest.setDurable(this.enableWal);
                        this.client.put(putRequest).addCallbacks(successCallback, failureCallback);
                    }
                    for (AtomicIncrementRequest atomicIncrementRequest : increments) {
                        if (this.batchIncrements) {
                            CellIdentifier cellIdentifier = new CellIdentifier(atomicIncrementRequest.key(), atomicIncrementRequest.qualifier());
                            AtomicIncrementRequest atomicIncrementRequest2 = this.incrementBuffer.get(cellIdentifier);
                            if (atomicIncrementRequest2 == null) {
                                this.incrementBuffer.put(cellIdentifier, atomicIncrementRequest);
                            } else {
                                atomicIncrementRequest2.setAmount(atomicIncrementRequest2.getAmount() + atomicIncrementRequest.getAmount());
                            }
                        } else {
                            this.client.atomicIncrement(atomicIncrementRequest).addCallbacks(successCallback2, failureCallback2);
                        }
                    }
                    i++;
                }
            }
            if (this.batchIncrements) {
                Collection<AtomicIncrementRequest> values = this.incrementBuffer.values();
                Iterator<AtomicIncrementRequest> it = values.iterator();
                while (it.hasNext()) {
                    this.client.atomicIncrement(it.next()).addCallbacks(successCallback2, failureCallback2);
                }
                atomicInteger2.addAndGet(values.size());
            }
            this.client.flush();
        } catch (Throwable th) {
            handleTransactionFailure(this.txn);
            checkIfChannelExceptionAndThrow(th);
        }
        if (i == this.batchSize) {
            this.sinkCounter.incrementBatchCompleteCount();
        }
        this.sinkCounter.addToEventDrainAttemptCount(i);
        reentrantLock.lock();
        long nanoTime = System.nanoTime();
        while (atomicInteger.get() < atomicInteger2.get() && !atomicBoolean.get()) {
            try {
                long nanoTime2 = this.timeout - (System.nanoTime() - nanoTime);
                try {
                    if (!newCondition.await(nanoTime2 >= 0 ? nanoTime2 : 0L, TimeUnit.NANOSECONDS)) {
                        atomicBoolean.set(true);
                        logger.warn("HBase callbacks timed out. Transaction will be rolled back.");
                    }
                } catch (Exception e) {
                    logger.error("Exception while waiting for callbacks from HBase.");
                    handleTransactionFailure(this.txn);
                    Throwables.propagate(e);
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        if (this.isCoalesceTest) {
            this.totalCallbacksReceived += atomicInteger.get();
        }
        if (atomicBoolean.get()) {
            handleTransactionFailure(this.txn);
            throw new EventDeliveryException("Could not write events to Hbase. Transaction failed, and rolled back.");
        }
        try {
            this.txn.commit();
            this.txn.close();
            this.sinkCounter.addToEventDrainSuccessCount(i);
        } catch (Throwable th2) {
            handleTransactionFailure(this.txn);
            checkIfChannelExceptionAndThrow(th2);
        }
        return status;
    }

    public void configure(Context context) {
        this.tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE);
        String string = context.getString(HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY);
        this.batchSize = context.getLong(HBaseSinkConfigurationConstants.CONFIG_BATCHSIZE, new Long(100L)).longValue();
        this.serializerContext = new Context();
        this.eventSerializerType = context.getString(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER);
        Preconditions.checkNotNull(this.tableName, "Table name cannot be empty, please specify in configuration file");
        Preconditions.checkNotNull(string, "Column family cannot be empty, please specify in configuration file");
        if (this.eventSerializerType == null || this.eventSerializerType.isEmpty()) {
            this.eventSerializerType = "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer";
            logger.info("No serializer defined, Will use default");
        }
        this.serializerContext.putAll(context.getSubProperties(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
        this.columnFamily = string.getBytes(Charsets.UTF_8);
        try {
            this.serializer = (AsyncHbaseEventSerializer) Class.forName(this.eventSerializerType).newInstance();
            this.serializer.configure(this.serializerContext);
            this.serializer.initialize(this.tableName.getBytes(Charsets.UTF_8), this.columnFamily);
        } catch (Exception e) {
            logger.error("Could not instantiate event serializer.", e);
            Throwables.propagate(e);
        }
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
        this.timeout = context.getLong(HBaseSinkConfigurationConstants.CONFIG_TIMEOUT, Long.valueOf(HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT)).longValue();
        if (this.timeout <= 0) {
            logger.warn("Timeout should be positive for Hbase sink. Sink will not timeout.");
            this.timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
        }
        this.timeout = TimeUnit.MILLISECONDS.toNanos(this.timeout);
        this.zkQuorum = context.getString(HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim();
        if (this.zkQuorum.isEmpty()) {
            if (this.conf == null) {
                this.conf = HBaseConfiguration.create();
            }
            this.zkQuorum = ZKConfig.getZKQuorumServersString(this.conf);
            this.zkBaseDir = this.conf.get("zookeeper.znode.parent", HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT);
        } else {
            this.zkBaseDir = context.getString(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT);
        }
        Preconditions.checkState((this.zkQuorum == null || this.zkQuorum.isEmpty()) ? false : true, "The Zookeeper quorum cannot be null and should be specified.");
        this.enableWal = context.getBoolean(HBaseSinkConfigurationConstants.CONFIG_ENABLE_WAL, true).booleanValue();
        logger.info("The write to WAL option is set to: " + String.valueOf(this.enableWal));
        if (!this.enableWal) {
            logger.warn("AsyncHBaseSink's enableWal configuration is set to false. All writes to HBase will have WAL disabled, and any data in the memstore of this region in the Region Server could be lost!");
        }
        this.batchIncrements = context.getBoolean(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS).booleanValue();
        if (this.batchIncrements) {
            this.incrementBuffer = Maps.newHashMap();
            logger.info("Increment coalescing is enabled. Increments will be buffered.");
        }
    }

    @VisibleForTesting
    int getTotalCallbacksReceived() {
        return this.totalCallbacksReceived;
    }

    @VisibleForTesting
    boolean isConfNull() {
        return this.conf == null;
    }

    public void start() {
        Preconditions.checkArgument(this.client == null, "Please call stop before calling start on an old instance.");
        this.sinkCounter.start();
        this.sinkCounter.incrementConnectionCreatedCount();
        if (this.isTimeoutTest) {
            this.sinkCallbackPool = Executors.newSingleThreadExecutor();
        } else {
            this.sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(getName() + " HBase Call Pool").build());
        }
        this.client = new HBaseClient(this.zkQuorum, this.zkBaseDir, this.sinkCallbackPool);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.ensureTableFamilyExists(this.tableName.getBytes(Charsets.UTF_8), this.columnFamily).addCallbacks(new Callback<Object, Object>() { // from class: org.apache.flume.sink.hbase.AsyncHBaseSink.1
            public Object call(Object obj) throws Exception {
                countDownLatch.countDown();
                return null;
            }
        }, new Callback<Object, Object>() { // from class: org.apache.flume.sink.hbase.AsyncHBaseSink.2
            public Object call(Object obj) throws Exception {
                atomicBoolean.set(true);
                countDownLatch.countDown();
                return null;
            }
        });
        try {
            countDownLatch.await();
            if (atomicBoolean.get()) {
                this.sinkCounter.incrementConnectionFailedCount();
                this.client.shutdown();
                this.client = null;
                throw new FlumeException("Could not start sink. Table or column family does not exist in Hbase.");
            }
            this.open = true;
            this.client.setFlushInterval((short) 0);
            super.start();
        } catch (InterruptedException e) {
            this.sinkCounter.incrementConnectionFailedCount();
            throw new FlumeException("Interrupted while waiting for Hbase Callbacks", e);
        }
    }

    public void stop() {
        this.serializer.cleanUp();
        if (this.client != null) {
            this.client.shutdown();
        }
        this.sinkCounter.incrementConnectionClosedCount();
        this.sinkCounter.stop();
        this.sinkCallbackPool.shutdown();
        try {
            if (!this.sinkCallbackPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.sinkCallbackPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            logger.error("Interrupted while waiting for asynchbase sink pool to die", e);
            this.sinkCallbackPool.shutdownNow();
        }
        this.sinkCallbackPool = null;
        this.client = null;
        this.conf = null;
        this.open = false;
        super.stop();
    }

    private void handleTransactionFailure(Transaction transaction) throws EventDeliveryException {
        try {
            try {
                transaction.rollback();
                transaction.close();
            } catch (Throwable th) {
                logger.error("Failed to commit transaction.Transaction rolled back.", th);
                if (!(th instanceof Error) && !(th instanceof RuntimeException)) {
                    logger.error("Failed to commit transaction.Transaction rolled back.", th);
                    throw new EventDeliveryException("Failed to commit transaction.Transaction rolled back.", th);
                }
                logger.error("Failed to commit transaction.Transaction rolled back.", th);
                Throwables.propagate(th);
                transaction.close();
            }
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    private void checkIfChannelExceptionAndThrow(Throwable th) throws EventDeliveryException {
        if (th instanceof ChannelException) {
            throw new EventDeliveryException("Error in processing transaction.", th);
        }
        if ((th instanceof Error) || (th instanceof RuntimeException)) {
            Throwables.propagate(th);
        }
        throw new EventDeliveryException("Error in processing transaction.", th);
    }
}
