package org.apache.storm.hive.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.hive.common.HiveUtils;
import org.apache.storm.hive.common.HiveWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hive/bolt/HiveBolt.class */
public class HiveBolt extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
    private OutputCollector collector;
    private HiveOptions options;
    private ExecutorService callTimeoutPool;
    private transient Timer heartBeatTimer;
    HashMap<HiveEndPoint, HiveWriter> allWriters;
    private Boolean kerberosEnabled = false;
    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
    private UserGroupInformation ugi = null;
    private Integer currentBatchSize = 0;

    public HiveBolt(HiveOptions hiveOptions) {
        this.options = hiveOptions;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        try {
            if (this.options.getKerberosPrincipal() == null && this.options.getKerberosKeytab() == null) {
                this.kerberosEnabled = false;
            } else {
                if (this.options.getKerberosPrincipal() == null || this.options.getKerberosKeytab() == null) {
                    throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal  & KerberosKeytab");
                }
                this.kerberosEnabled = true;
            }
            if (this.kerberosEnabled.booleanValue()) {
                try {
                    this.ugi = HiveUtils.authenticate(this.options.getKerberosKeytab(), this.options.getKerberosPrincipal());
                } catch (HiveUtils.AuthenticationFailed e) {
                    LOG.error("Hive Kerberos authentication failed " + e.getMessage(), e);
                    throw new IllegalArgumentException(e);
                }
            }
            this.collector = outputCollector;
            this.allWriters = new HashMap<>();
            this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("hive-bolt-%d").build());
            this.heartBeatTimer = new Timer();
            setupHeartBeatTimer();
        } catch (Exception e2) {
            LOG.warn("unable to make connection to hive ", e2);
        }
    }

    public void execute(Tuple tuple) {
        try {
            HiveWriter orCreateWriter = getOrCreateWriter(HiveUtils.makeEndPoint(this.options.getMapper().mapPartitions(tuple), this.options));
            if (this.timeToSendHeartBeat.compareAndSet(true, false)) {
                enableHeartBeatOnAllWriters();
            }
            orCreateWriter.write(this.options.getMapper().mapRecord(tuple));
            Integer num = this.currentBatchSize;
            this.currentBatchSize = Integer.valueOf(this.currentBatchSize.intValue() + 1);
            if (this.currentBatchSize.intValue() >= this.options.getBatchSize().intValue()) {
                flushAllWriters();
                this.currentBatchSize = 0;
            }
            this.collector.ack(tuple);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(tuple);
            flushAndCloseWriters();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void cleanup() {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                HiveWriter value = entry.getValue();
                LOG.info("Flushing writer to {}", value);
                value.flush(false);
                LOG.info("Closing writer to {}", value);
                value.close();
            } catch (Exception e) {
                LOG.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", e);
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        for (ExecutorService executorService : new ExecutorService[]{this.callTimeoutPool}) {
            executorService.shutdown();
            while (!executorService.isTerminated()) {
                try {
                    executorService.awaitTermination(this.options.getCallTimeOut().intValue(), TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    LOG.warn("shutdown interrupted on " + executorService, e2);
                }
            }
        }
        this.callTimeoutPool = null;
        super.cleanup();
        LOG.info("Hive Bolt stopped");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupHeartBeatTimer() {
        if (this.options.getHeartBeatInterval().intValue() > 0) {
            this.heartBeatTimer.schedule(new TimerTask() { // from class: org.apache.storm.hive.bolt.HiveBolt.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    HiveBolt.this.timeToSendHeartBeat.set(true);
                    HiveBolt.this.setupHeartBeatTimer();
                }
            }, this.options.getHeartBeatInterval().intValue() * 1000);
        }
    }

    private void flushAllWriters() throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().flush(true);
        }
    }

    private void closeAllWriters() {
        try {
            Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = this.allWriters.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            this.allWriters.clear();
        } catch (Exception e) {
            LOG.warn("unable to close writers. ", e);
        }
    }

    private void flushAndCloseWriters() {
        try {
            try {
                flushAllWriters();
                closeAllWriters();
            } catch (Exception e) {
                LOG.warn("unable to flush hive writers. ", e);
                closeAllWriters();
            }
        } catch (Throwable th) {
            closeAllWriters();
            throw th;
        }
    }

    private void enableHeartBeatOnAllWriters() {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().setHeartBeatNeeded();
        }
    }

    private HiveWriter getOrCreateWriter(HiveEndPoint hiveEndPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        try {
            HiveWriter hiveWriter = this.allWriters.get(hiveEndPoint);
            if (hiveWriter == null) {
                LOG.debug("Creating Writer to Hive end point : " + hiveEndPoint);
                hiveWriter = HiveUtils.makeHiveWriter(hiveEndPoint, this.callTimeoutPool, this.ugi, this.options);
                if (this.allWriters.size() > this.options.getMaxOpenConnections().intValue() && retireIdleWriters() == 0) {
                    retireEldestWriter();
                }
                this.allWriters.put(hiveEndPoint, hiveWriter);
            }
            return hiveWriter;
        } catch (HiveWriter.ConnectFailure e) {
            LOG.error("Failed to create HiveWriter for endpoint: " + hiveEndPoint, e);
            throw e;
        }
    }

    private void retireEldestWriter() {
        long currentTimeMillis = System.currentTimeMillis();
        HiveEndPoint hiveEndPoint = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() < currentTimeMillis) {
                hiveEndPoint = entry.getKey();
                currentTimeMillis = entry.getValue().getLastUsed();
            }
        }
        try {
            LOG.info("Closing least used Writer to Hive end point : " + hiveEndPoint);
            this.allWriters.remove(hiveEndPoint).close();
        } catch (IOException e) {
            LOG.warn("Failed to close writer for end point: " + hiveEndPoint, e);
        } catch (InterruptedException e2) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e2);
            Thread.currentThread().interrupt();
        }
    }

    private int retireIdleWriters() {
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (currentTimeMillis - entry.getValue().getLastUsed() > this.options.getIdleTimeout().intValue()) {
                i++;
                arrayList.add(entry.getKey());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            HiveEndPoint hiveEndPoint = (HiveEndPoint) it.next();
            try {
                LOG.info("Closing idle Writer to Hive end point : {}", hiveEndPoint);
                this.allWriters.remove(hiveEndPoint).close();
            } catch (IOException e) {
                LOG.warn("Failed to close writer for end point: {}. Error: " + hiveEndPoint, e);
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e2);
                Thread.currentThread().interrupt();
            }
        }
        return i;
    }
}
