package org.apache.ranger.tagsync.source.atlas;

import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.ranger.plugin.util.ServiceTags;
import org.apache.ranger.tagsync.model.AbstractTagSource;
import org.apache.ranger.tagsync.process.TagSyncConfig;
import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ranger/tagsync/source/atlas/AtlasTagSource.class */
public class AtlasTagSource extends AbstractTagSource {
    private static final Logger LOG = LoggerFactory.getLogger(AtlasTagSource.class);
    public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "atlas-application.properties";
    public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers";
    public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
    public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
    public static final int MAX_WAIT_TIME_IN_MILLIS = 1000;
    private int maxBatchSize;
    private ConsumerRunnable consumerTask;
    private Thread myThread = null;

    /* loaded from: input_file:org/apache/ranger/tagsync/source/atlas/AtlasTagSource$ConsumerRunnable.class */
    private class ConsumerRunnable implements Runnable {
        private final NotificationConsumer<EntityNotification> consumer;
        private final List<RangerAtlasEntityWithTags> atlasEntitiesWithTags;
        private final List<AtlasKafkaMessage<EntityNotification>> messages;
        private long offsetOfLastMessageDeliveredToRanger;
        private long offsetOfLastMessageCommittedToKafka;
        private boolean isHandlingDeleteOps;

        private ConsumerRunnable(NotificationConsumer<EntityNotification> notificationConsumer) {
            this.atlasEntitiesWithTags = new ArrayList();
            this.messages = new ArrayList();
            this.offsetOfLastMessageDeliveredToRanger = -1L;
            this.offsetOfLastMessageCommittedToKafka = -1L;
            this.isHandlingDeleteOps = false;
            this.consumer = notificationConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (AtlasTagSource.LOG.isDebugEnabled()) {
                AtlasTagSource.LOG.debug("==> ConsumerRunnable.run()");
            }
            while (true) {
                try {
                    List<AtlasKafkaMessage<EntityNotification>> receive = this.consumer.receive(1000L);
                    if (receive.size() == 0) {
                        if (AtlasTagSource.LOG.isDebugEnabled()) {
                            AtlasTagSource.LOG.debug("AtlasTagSource.ConsumerRunnable.run: no message from NotificationConsumer within 1000 milliseconds");
                        }
                        if (CollectionUtils.isNotEmpty(this.atlasEntitiesWithTags)) {
                            buildAndUploadServiceTags();
                        }
                    } else {
                        for (AtlasKafkaMessage<EntityNotification> atlasKafkaMessage : receive) {
                            EntityNotification entityNotification = atlasKafkaMessage != null ? (EntityNotification) atlasKafkaMessage.getMessage() : null;
                            if (entityNotification != null) {
                                EntityNotificationWrapper entityNotificationWrapper = null;
                                try {
                                    entityNotificationWrapper = new EntityNotificationWrapper(entityNotification);
                                } catch (Throwable th) {
                                    AtlasTagSource.LOG.error("notification:[" + entityNotification + "] has some issues..perhaps null entity??", th);
                                }
                                if (entityNotificationWrapper != null) {
                                    if (AtlasTagSource.LOG.isDebugEnabled()) {
                                        AtlasTagSource.LOG.debug("Message-offset=" + atlasKafkaMessage.getOffset() + ", Notification=" + AtlasTagSource.getPrintableEntityNotification(entityNotificationWrapper));
                                    }
                                    RangerAtlasEntityWithTags rangerAtlasEntityWithTags = new RangerAtlasEntityWithTags(entityNotificationWrapper);
                                    if ((entityNotificationWrapper.getIsEntityDeleteOp() && !this.isHandlingDeleteOps) || (!entityNotificationWrapper.getIsEntityDeleteOp() && this.isHandlingDeleteOps)) {
                                        buildAndUploadServiceTags();
                                        this.isHandlingDeleteOps = !this.isHandlingDeleteOps;
                                    }
                                    this.atlasEntitiesWithTags.add(rangerAtlasEntityWithTags);
                                    this.messages.add(atlasKafkaMessage);
                                }
                            } else {
                                AtlasTagSource.LOG.error("Null entityNotification received from Kafka!! Ignoring..");
                            }
                        }
                        if (CollectionUtils.isNotEmpty(this.atlasEntitiesWithTags) && this.atlasEntitiesWithTags.size() >= AtlasTagSource.this.maxBatchSize) {
                            buildAndUploadServiceTags();
                        }
                    }
                } catch (Exception e) {
                    AtlasTagSource.LOG.error("Caught exception..: ", e);
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e2) {
                        AtlasTagSource.LOG.error("Interrupted: ", e2);
                        AtlasTagSource.LOG.error("Returning from thread. May cause process to be up but not processing events!!");
                        return;
                    }
                }
            }
        }

        private void buildAndUploadServiceTags() throws Exception {
            if (AtlasTagSource.LOG.isDebugEnabled()) {
                AtlasTagSource.LOG.debug("==> buildAndUploadServiceTags()");
            }
            commitToKafka();
            Map<String, ServiceTags> processAtlasEntities = AtlasNotificationMapper.processAtlasEntities(this.atlasEntitiesWithTags);
            if (MapUtils.isNotEmpty(processAtlasEntities)) {
                if (processAtlasEntities.size() != 1) {
                    AtlasTagSource.LOG.warn("Unexpected!! Notifications for more than one service received by AtlasTagSource.. Service-Names:[" + processAtlasEntities.keySet() + "]");
                }
                for (Map.Entry<String, ServiceTags> entry : processAtlasEntities.entrySet()) {
                    if (this.isHandlingDeleteOps) {
                        entry.getValue().setOp("delete");
                        entry.getValue().setTagDefinitions(Collections.EMPTY_MAP);
                        entry.getValue().setTags(Collections.EMPTY_MAP);
                    } else {
                        entry.getValue().setOp("add_or_update");
                    }
                    if (AtlasTagSource.LOG.isDebugEnabled()) {
                        AtlasTagSource.LOG.debug("serviceTags=" + new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create().toJson(entry.getValue()));
                    }
                    AtlasTagSource.this.updateSink(entry.getValue());
                }
                this.offsetOfLastMessageDeliveredToRanger = this.messages.get(this.messages.size() - 1).getOffset();
                if (AtlasTagSource.LOG.isDebugEnabled()) {
                    AtlasTagSource.LOG.debug("Completed processing batch of messages of size:[" + this.messages.size() + "] received from NotificationConsumer");
                }
                commitToKafka();
            }
            this.atlasEntitiesWithTags.clear();
            this.messages.clear();
            if (AtlasTagSource.LOG.isDebugEnabled()) {
                AtlasTagSource.LOG.debug("<== buildAndUploadServiceTags()");
            }
        }

        private void commitToKafka() {
            if (AtlasTagSource.LOG.isDebugEnabled()) {
                AtlasTagSource.LOG.debug("==> commitToKafka()");
            }
            for (AtlasKafkaMessage<EntityNotification> atlasKafkaMessage : this.messages) {
                if (atlasKafkaMessage.getOffset() > this.offsetOfLastMessageCommittedToKafka) {
                    if (atlasKafkaMessage.getOffset() > this.offsetOfLastMessageDeliveredToRanger) {
                        break;
                    }
                    TopicPartition topicPartition = new TopicPartition("ATLAS_ENTITIES", atlasKafkaMessage.getPartition());
                    try {
                        if (AtlasTagSource.LOG.isDebugEnabled()) {
                            AtlasTagSource.LOG.debug("Committing message with offset:[" + atlasKafkaMessage.getOffset() + "] to Kafka");
                        }
                        this.consumer.commit(topicPartition, atlasKafkaMessage.getOffset());
                        this.offsetOfLastMessageCommittedToKafka = atlasKafkaMessage.getOffset();
                    } catch (Exception e) {
                        AtlasTagSource.LOG.warn("Ranger tagsync already processed message at offset " + atlasKafkaMessage.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", e);
                        AtlasTagSource.LOG.warn("This will cause Kafka to deliver this message:[" + atlasKafkaMessage.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
                    }
                }
            }
            if (AtlasTagSource.LOG.isDebugEnabled()) {
                AtlasTagSource.LOG.debug("<== commitToKafka()");
            }
        }
    }

    @Override // org.apache.ranger.tagsync.model.TagSource
    public boolean initialize(Properties properties) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> AtlasTagSource.initialize()");
        }
        Properties properties2 = new Properties();
        boolean initializeAtlasResourceMappers = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
        if (initializeAtlasResourceMappers) {
            InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
            try {
                if (resourceAsStream != null) {
                    try {
                        properties2.load(resourceAsStream);
                        try {
                            resourceAsStream.close();
                        } catch (IOException e) {
                            LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", e);
                        }
                    } catch (Exception e2) {
                        initializeAtlasResourceMappers = false;
                        LOG.error("Cannot load Atlas application properties file, file-name:atlas-application.properties", e2);
                        try {
                            resourceAsStream.close();
                        } catch (IOException e3) {
                            LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", e3);
                        }
                    }
                } else {
                    initializeAtlasResourceMappers = false;
                    LOG.error("Cannot find Atlas application properties file");
                }
            } catch (Throwable th) {
                try {
                    resourceAsStream.close();
                } catch (IOException e4) {
                    LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", e4);
                }
                throw th;
            }
        }
        if (initializeAtlasResourceMappers) {
            if (StringUtils.isBlank(properties2.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS))) {
                initializeAtlasResourceMappers = false;
                LOG.error("Value of property 'atlas.kafka.bootstrap.servers' is not specified!");
            }
            if (StringUtils.isBlank(properties2.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT))) {
                initializeAtlasResourceMappers = false;
                LOG.error("Value of property 'atlas.kafka.zookeeper.connect' is not specified!");
            }
            if (StringUtils.isBlank(properties2.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP))) {
                initializeAtlasResourceMappers = false;
                LOG.error("Value of property 'atlas.kafka.entities.group.id' is not specified!");
            }
        }
        if (initializeAtlasResourceMappers) {
            this.consumerTask = new ConsumerRunnable((NotificationConsumer) NotificationProvider.get().createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0));
        }
        this.maxBatchSize = TagSyncConfig.getSinkMaxBatchSize(properties);
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== AtlasTagSource.initialize(), result=" + initializeAtlasResourceMappers);
        }
        return initializeAtlasResourceMappers;
    }

    @Override // org.apache.ranger.tagsync.model.TagSource
    public boolean start() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> AtlasTagSource.start()");
        }
        if (this.consumerTask == null) {
            LOG.error("No consumerTask!!!");
        } else {
            this.myThread = new Thread(this.consumerTask);
            this.myThread.setDaemon(true);
            this.myThread.start();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== AtlasTagSource.start()");
        }
        return this.myThread != null;
    }

    @Override // org.apache.ranger.tagsync.model.TagSource
    public void stop() {
        if (this.myThread == null || !this.myThread.isAlive()) {
            return;
        }
        this.myThread.interrupt();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getPrintableEntityNotification(EntityNotificationWrapper entityNotificationWrapper) {
        StringBuilder sb = new StringBuilder();
        sb.append("{ Notification-Type: ").append(entityNotificationWrapper.getOpType()).append(", ");
        sb.append(new RangerAtlasEntityWithTags(entityNotificationWrapper).toString());
        sb.append("}");
        return sb.toString();
    }
}
