package org.apache.hive.druid.io.druid.segment.realtime.firehose;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JacksonInject;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hive.druid.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.hive.druid.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.annotations.VisibleForTesting;
import org.apache.hive.druid.com.google.common.base.Optional;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.io.CountingInputStream;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.concurrent.Execs;
import org.apache.hive.druid.io.druid.data.input.Firehose;
import org.apache.hive.druid.io.druid.data.input.FirehoseFactory;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.data.input.impl.MapInputRowParser;
import org.apache.hive.druid.io.druid.guice.annotations.Json;
import org.apache.hive.druid.io.druid.guice.annotations.Smile;
import org.apache.hive.druid.io.druid.server.metrics.EventReceiverFirehoseMetric;
import org.apache.hive.druid.io.druid.server.metrics.EventReceiverFirehoseRegister;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/hive/druid/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.class */
public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRowParser> {
    private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
    private static final int DEFAULT_BUFFER_SIZE = 100000;
    private final String serviceName;
    private final int bufferSize;
    private final Optional<ChatHandlerProvider> chatHandlerProvider;
    private final ObjectMapper jsonMapper;
    private final ObjectMapper smileMapper;
    private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;

    /* loaded from: input_file:org/apache/hive/druid/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory$EventReceiverFirehose.class */
    public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric {
        private final BlockingQueue<InputRow> buffer;
        private final MapInputRowParser parser;
        private final Object readLock = new Object();
        private volatile InputRow nextRow = null;
        private volatile boolean closed = false;
        private final AtomicLong bytesReceived = new AtomicLong(0);
        private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
        private final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");

        public EventReceiverFirehose(MapInputRowParser mapInputRowParser) {
            this.buffer = new ArrayBlockingQueue(EventReceiverFirehoseFactory.this.bufferSize);
            this.parser = mapInputRowParser;
        }

        @Path("/push-events")
        @Consumes({"application/json", "application/x-jackson-smile"})
        @POST
        @Produces({"application/json", "application/x-jackson-smile"})
        public Response addAll(InputStream inputStream, @Context HttpServletRequest httpServletRequest) {
            boolean equals = "application/x-jackson-smile".equals(httpServletRequest.getContentType());
            String str = equals ? "application/x-jackson-smile" : "application/json";
            ObjectMapper objectMapper = equals ? EventReceiverFirehoseFactory.this.smileMapper : EventReceiverFirehoseFactory.this.jsonMapper;
            CountingInputStream countingInputStream = new CountingInputStream(inputStream);
            try {
                try {
                    Collection collection = (Collection) objectMapper.readValue(countingInputStream, new TypeReference<Collection<Map<String, Object>>>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory.EventReceiverFirehose.1
                    });
                    this.bytesReceived.addAndGet(countingInputStream.getCount());
                    EventReceiverFirehoseFactory.log.debug("Adding %,d events to firehose: %s", Integer.valueOf(collection.size()), EventReceiverFirehoseFactory.this.serviceName);
                    ArrayList newArrayList = Lists.newArrayList();
                    Iterator it2 = collection.iterator();
                    while (it2.hasNext()) {
                        newArrayList.add(this.parser.parse((Map<String, Object>) it2.next()));
                    }
                    try {
                        addRows(newArrayList);
                        return Response.ok(objectMapper.writeValueAsString(ImmutableMap.of("eventCount", Integer.valueOf(collection.size()))), str).build();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw Throwables.propagate(e);
                    } catch (JsonProcessingException e2) {
                        throw Throwables.propagate(e2);
                    }
                } catch (IOException e3) {
                    Response build = Response.serverError().entity(ImmutableMap.of("error", e3.getMessage())).build();
                    this.bytesReceived.addAndGet(countingInputStream.getCount());
                    return build;
                }
            } catch (Throwable th) {
                this.bytesReceived.addAndGet(countingInputStream.getCount());
                throw th;
            }
        }

        @Override // org.apache.hive.druid.io.druid.data.input.Firehose
        public boolean hasMore() {
            boolean z;
            synchronized (this.readLock) {
                do {
                    try {
                        if (this.nextRow != null) {
                            break;
                        }
                        this.nextRow = this.buffer.poll(500L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw Throwables.propagate(e);
                    }
                } while (!this.closed);
                z = this.nextRow != null;
            }
            return z;
        }

        @Override // org.apache.hive.druid.io.druid.data.input.Firehose
        public InputRow nextRow() {
            InputRow inputRow;
            synchronized (this.readLock) {
                inputRow = this.nextRow;
                if (inputRow == null) {
                    throw new NoSuchElementException();
                }
                this.nextRow = null;
            }
            return inputRow;
        }

        @Override // org.apache.hive.druid.io.druid.data.input.Firehose
        public Runnable commit() {
            return new Runnable() { // from class: org.apache.hive.druid.io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory.EventReceiverFirehose.2
                @Override // java.lang.Runnable
                public void run() {
                }
            };
        }

        @Override // org.apache.hive.druid.io.druid.server.metrics.EventReceiverFirehoseMetric
        public int getCurrentBufferSize() {
            return this.buffer.size();
        }

        @Override // org.apache.hive.druid.io.druid.server.metrics.EventReceiverFirehoseMetric
        public int getCapacity() {
            return EventReceiverFirehoseFactory.this.bufferSize;
        }

        @Override // org.apache.hive.druid.io.druid.server.metrics.EventReceiverFirehoseMetric
        public long getBytesReceived() {
            return this.bytesReceived.get();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.closed) {
                return;
            }
            EventReceiverFirehoseFactory.log.info("Firehose closing.", new Object[0]);
            this.closed = true;
            EventReceiverFirehoseFactory.this.eventReceiverFirehoseRegister.unregister(EventReceiverFirehoseFactory.this.serviceName);
            if (EventReceiverFirehoseFactory.this.chatHandlerProvider.isPresent()) {
                ((ChatHandlerProvider) EventReceiverFirehoseFactory.this.chatHandlerProvider.get()).unregister(EventReceiverFirehoseFactory.this.serviceName);
            }
            this.exec.shutdown();
        }

        public void addRows(Iterable<InputRow> iterable) throws InterruptedException {
            for (InputRow inputRow : iterable) {
                boolean z = false;
                while (!this.closed && !z) {
                    z = this.buffer.offer(inputRow, 500L, TimeUnit.MILLISECONDS);
                    if (!z) {
                        long currentTimeMillis = System.currentTimeMillis();
                        long j = this.lastBufferAddFailMsgTime.get();
                        if (currentTimeMillis - j > 10000 && this.lastBufferAddFailMsgTime.compareAndSet(j, currentTimeMillis)) {
                            EventReceiverFirehoseFactory.log.warn("Failed to add event to buffer with current size [%s] . Retrying...", Integer.valueOf(this.buffer.size()));
                        }
                    }
                }
                if (!z) {
                    throw new IllegalStateException("Cannot add events to closed firehose!");
                }
            }
        }

        @Path("/shutdown")
        @Consumes({"application/json", "application/x-jackson-smile"})
        @POST
        @Produces({"application/json", "application/x-jackson-smile"})
        public Response shutdown(@QueryParam("shutoffTime") String str) {
            try {
                DateTime now = str == null ? DateTime.now() : new DateTime(str);
                EventReceiverFirehoseFactory.log.info("Setting Firehose shutoffTime to %s", str);
                this.exec.schedule(new Runnable() { // from class: org.apache.hive.druid.io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory.EventReceiverFirehose.3
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            EventReceiverFirehose.this.close();
                        } catch (IOException e) {
                            EventReceiverFirehoseFactory.log.warn(e, "Failed to close delegate firehose, ignoring.", new Object[0]);
                        }
                    }
                }, now.getMillis() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                return Response.ok().build();
            } catch (IllegalArgumentException e) {
                return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", e.getMessage())).build();
            }
        }

        @VisibleForTesting
        public boolean isClosed() {
            return this.closed;
        }
    }

    @JsonCreator
    public EventReceiverFirehoseFactory(@JsonProperty("serviceName") String str, @JsonProperty("bufferSize") Integer num, @JacksonInject ChatHandlerProvider chatHandlerProvider, @Json @JacksonInject ObjectMapper objectMapper, @JacksonInject @Smile ObjectMapper objectMapper2, @JacksonInject EventReceiverFirehoseRegister eventReceiverFirehoseRegister) {
        Preconditions.checkNotNull(str, "serviceName");
        this.serviceName = str;
        this.bufferSize = (num == null || num.intValue() <= 0) ? 100000 : num.intValue();
        this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
        this.jsonMapper = objectMapper;
        this.smileMapper = objectMapper2;
        this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister;
    }

    @Override // org.apache.hive.druid.io.druid.data.input.FirehoseFactory
    public Firehose connect(MapInputRowParser mapInputRowParser) throws IOException {
        log.info("Connecting firehose: %s", this.serviceName);
        EventReceiverFirehose eventReceiverFirehose = new EventReceiverFirehose(mapInputRowParser);
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chathandler of class[%s]", this.chatHandlerProvider.get().getClass().getName());
            this.chatHandlerProvider.get().register(this.serviceName, eventReceiverFirehose);
            if (this.serviceName.contains(":")) {
                this.chatHandlerProvider.get().register(this.serviceName.replaceAll(".*:", ""), eventReceiverFirehose);
            }
        } else {
            log.warn("No chathandler detected", new Object[0]);
        }
        this.eventReceiverFirehoseRegister.register(this.serviceName, eventReceiverFirehose);
        return eventReceiverFirehose;
    }

    @JsonProperty
    public String getServiceName() {
        return this.serviceName;
    }

    @JsonProperty
    public int getBufferSize() {
        return this.bufferSize;
    }
}
