/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.segment.realtime.firehose;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JacksonInject;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hive.druid.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.hive.druid.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.annotations.VisibleForTesting;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.io.CountingInputStream;
import org.apache.hive.druid.io.druid.data.input.Firehose;
import org.apache.hive.druid.io.druid.data.input.FirehoseFactory;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.data.input.impl.InputRowParser;
import org.apache.hive.druid.io.druid.guice.annotations.Json;
import org.apache.hive.druid.io.druid.guice.annotations.Smile;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.concurrent.Execs;
import org.apache.hive.druid.io.druid.java.util.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.segment.realtime.firehose.ChatHandler;
import org.apache.hive.druid.io.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.hive.druid.io.druid.server.metrics.EventReceiverFirehoseMetric;
import org.apache.hive.druid.io.druid.server.metrics.EventReceiverFirehoseRegister;
import org.apache.hive.druid.io.druid.server.security.Access;
import org.apache.hive.druid.io.druid.server.security.Action;
import org.apache.hive.druid.io.druid.server.security.AuthorizationUtils;
import org.apache.hive.druid.io.druid.server.security.AuthorizerMapper;
import org.apache.hive.druid.io.druid.server.security.Resource;
import org.apache.hive.druid.io.druid.server.security.ResourceAction;
import org.apache.hive.druid.io.druid.server.security.ResourceType;
import org.joda.time.DateTime;

public class EventReceiverFirehoseFactory
implements FirehoseFactory<InputRowParser<Map<String, Object>>> {
    public static final int MAX_FIREHOSE_PRODUCERS = 10000;
    private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
    private static final int DEFAULT_BUFFER_SIZE = 100000;
    private final String serviceName;
    private final int bufferSize;
    private final Optional<ChatHandlerProvider> chatHandlerProvider;
    private final ObjectMapper jsonMapper;
    private final ObjectMapper smileMapper;
    private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
    private final AuthorizerMapper authorizerMapper;

    @JsonCreator
    public EventReceiverFirehoseFactory(@JsonProperty(value="serviceName") String serviceName, @JsonProperty(value="bufferSize") Integer bufferSize, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject @Json ObjectMapper jsonMapper, @JacksonInject @Smile ObjectMapper smileMapper, @JacksonInject EventReceiverFirehoseRegister eventReceiverFirehoseRegister, @JacksonInject AuthorizerMapper authorizerMapper) {
        Preconditions.checkNotNull(serviceName, "serviceName");
        this.serviceName = serviceName;
        this.bufferSize = bufferSize == null || bufferSize <= 0 ? 100000 : bufferSize;
        this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider);
        this.jsonMapper = jsonMapper;
        this.smileMapper = smileMapper;
        this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister;
        this.authorizerMapper = authorizerMapper;
    }

    @Override
    public Firehose connect(InputRowParser<Map<String, Object>> firehoseParser, File temporaryDirectory) throws IOException {
        log.info("Connecting firehose: %s", this.serviceName);
        EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser);
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chathandler of class[%s]", this.chatHandlerProvider.get().getClass().getName());
            this.chatHandlerProvider.get().register(this.serviceName, firehose);
            if (this.serviceName.contains(":")) {
                this.chatHandlerProvider.get().register(this.serviceName.replaceAll(".*:", ""), firehose);
            }
        } else {
            log.warn("No chathandler detected", new Object[0]);
        }
        this.eventReceiverFirehoseRegister.register(this.serviceName, firehose);
        return firehose;
    }

    @JsonProperty
    public String getServiceName() {
        return this.serviceName;
    }

    @JsonProperty
    public int getBufferSize() {
        return this.bufferSize;
    }

    public class EventReceiverFirehose
    implements ChatHandler,
    Firehose,
    EventReceiverFirehoseMetric {
        private final ScheduledExecutorService exec;
        private final BlockingQueue<InputRow> buffer;
        private final InputRowParser<Map<String, Object>> parser;
        private final Object readLock = new Object();
        private volatile InputRow nextRow = null;
        private volatile boolean closed = false;
        private final AtomicLong bytesReceived = new AtomicLong(0L);
        private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0L);
        private final ConcurrentMap<String, Long> producerSequences = new ConcurrentHashMap<String, Long>();

        public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser) {
            this.buffer = new ArrayBlockingQueue<InputRow>(EventReceiverFirehoseFactory.this.bufferSize);
            this.parser = parser;
            this.exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @POST
        @Path(value="/push-events")
        @Consumes(value={"application/json", "application/x-jackson-smile"})
        @Produces(value={"application/json", "application/x-jackson-smile"})
        public Response addAll(InputStream in, @Context HttpServletRequest req) {
            Access accessResult = AuthorizationUtils.authorizeResourceAction(req, new ResourceAction(new Resource("STATE", ResourceType.STATE), Action.WRITE), EventReceiverFirehoseFactory.this.authorizerMapper);
            if (!accessResult.isAllowed()) {
                return Response.status((int)403).build();
            }
            String reqContentType = req.getContentType();
            boolean isSmile = "application/x-jackson-smile".equals(reqContentType);
            String contentType = isSmile ? "application/x-jackson-smile" : "application/json";
            ObjectMapper objectMapper = isSmile ? EventReceiverFirehoseFactory.this.smileMapper : EventReceiverFirehoseFactory.this.jsonMapper;
            Optional<Response> producerSequenceResponse = this.checkProducerSequence(req, reqContentType, objectMapper);
            if (producerSequenceResponse.isPresent()) {
                return producerSequenceResponse.get();
            }
            CountingInputStream countingInputStream = new CountingInputStream(in);
            Collection<Map<String, Object>> events = null;
            try {
                events = objectMapper.readValue((InputStream)countingInputStream, new TypeReference<Collection<Map<String, Object>>>(){});
            }
            catch (IOException e) {
                Response response = Response.serverError().entity(ImmutableMap.of("error", e.getMessage())).build();
                return response;
            }
            finally {
                this.bytesReceived.addAndGet(countingInputStream.getCount());
            }
            log.debug("Adding %,d events to firehose: %s", events.size(), EventReceiverFirehoseFactory.this.serviceName);
            ArrayList<InputRow> rows = Lists.newArrayList();
            for (Map<String, Object> event : events) {
                rows.addAll(this.parser.parseBatch(event));
            }
            try {
                this.addRows(rows);
                return Response.ok((Object)objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())), (String)contentType).build();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate(e);
            }
            catch (JsonProcessingException e) {
                throw Throwables.propagate(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean hasMore() {
            Object object = this.readLock;
            synchronized (object) {
                try {
                    while (this.nextRow == null) {
                        this.nextRow = this.buffer.poll(500L, TimeUnit.MILLISECONDS);
                        if (!this.closed) continue;
                        break;
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw Throwables.propagate(e);
                }
                return this.nextRow != null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        @Nullable
        public InputRow nextRow() {
            Object object = this.readLock;
            synchronized (object) {
                InputRow row = this.nextRow;
                if (row == null) {
                    throw new NoSuchElementException();
                }
                this.nextRow = null;
                return row;
            }
        }

        @Override
        public Runnable commit() {
            return new Runnable(){

                @Override
                public void run() {
                }
            };
        }

        @Override
        public int getCurrentBufferSize() {
            return this.buffer.size();
        }

        @Override
        public int getCapacity() {
            return EventReceiverFirehoseFactory.this.bufferSize;
        }

        @Override
        public long getBytesReceived() {
            return this.bytesReceived.get();
        }

        @Override
        public void close() throws IOException {
            if (!this.closed) {
                log.info("Firehose closing.", new Object[0]);
                this.closed = true;
                EventReceiverFirehoseFactory.this.eventReceiverFirehoseRegister.unregister(EventReceiverFirehoseFactory.this.serviceName);
                if (EventReceiverFirehoseFactory.this.chatHandlerProvider.isPresent()) {
                    ((ChatHandlerProvider)EventReceiverFirehoseFactory.this.chatHandlerProvider.get()).unregister(EventReceiverFirehoseFactory.this.serviceName);
                }
                this.exec.shutdown();
            }
        }

        public void addRows(Iterable<InputRow> rows) throws InterruptedException {
            for (InputRow row : rows) {
                boolean added = false;
                while (!this.closed && !added) {
                    long lastTime;
                    long currTime;
                    added = this.buffer.offer(row, 500L, TimeUnit.MILLISECONDS);
                    if (added || (currTime = System.currentTimeMillis()) - (lastTime = this.lastBufferAddFailMsgTime.get()) <= 10000L || !this.lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) continue;
                    log.warn("Failed to add event to buffer with current size [%s] . Retrying...", this.buffer.size());
                }
                if (added) continue;
                throw new IllegalStateException("Cannot add events to closed firehose!");
            }
        }

        @POST
        @Path(value="/shutdown")
        @Consumes(value={"application/json", "application/x-jackson-smile"})
        @Produces(value={"application/json", "application/x-jackson-smile"})
        public Response shutdown(@QueryParam(value="shutoffTime") String shutoffTime, @Context HttpServletRequest req) {
            Access accessResult = AuthorizationUtils.authorizeResourceAction(req, new ResourceAction(new Resource("STATE", ResourceType.STATE), Action.WRITE), EventReceiverFirehoseFactory.this.authorizerMapper);
            if (!accessResult.isAllowed()) {
                return Response.status((int)403).build();
            }
            try {
                DateTime shutoffAt = shutoffTime == null ? DateTimes.nowUtc() : DateTimes.of(shutoffTime);
                log.info("Setting Firehose shutoffTime to %s", shutoffTime);
                this.exec.schedule(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            EventReceiverFirehose.this.close();
                        }
                        catch (IOException e) {
                            log.warn(e, "Failed to close delegate firehose, ignoring.", new Object[0]);
                        }
                    }
                }, shutoffAt.getMillis() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                return Response.ok().build();
            }
            catch (IllegalArgumentException e) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", e.getMessage())).build();
            }
        }

        @VisibleForTesting
        public boolean isClosed() {
            return this.closed;
        }

        private Optional<Response> checkProducerSequence(HttpServletRequest req, String responseContentType, ObjectMapper responseMapper) {
            String producerId = req.getHeader("X-Firehose-Producer-Id");
            if (producerId == null) {
                return Optional.empty();
            }
            String sequenceValue = req.getHeader("X-Firehose-Producer-Seq");
            if (sequenceValue == null) {
                return Optional.of(Response.status((Response.Status)Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "Producer sequence value is missing")).build());
            }
            Long producerSequence = this.producerSequences.computeIfAbsent(producerId, key -> Long.MIN_VALUE);
            if (this.producerSequences.size() >= 10000) {
                return Optional.of(Response.status((Response.Status)Response.Status.FORBIDDEN).entity(ImmutableMap.of("error", "Too many individual producer IDs for this firehose.  Max is 10000")).build());
            }
            try {
                Long newSequence = Long.parseLong(sequenceValue);
                if (newSequence <= producerSequence) {
                    return Optional.of(Response.ok((Object)responseMapper.writeValueAsString(ImmutableMap.of("eventCount", 0, "skipped", true)), (String)responseContentType).build());
                }
                this.producerSequences.put(producerId, newSequence);
            }
            catch (JsonProcessingException ex) {
                throw Throwables.propagate(ex);
            }
            catch (NumberFormatException ex) {
                return Optional.of(Response.status((Response.Status)Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "Producer sequence must be a number")).build());
            }
            return Optional.empty();
        }
    }
}

