package org.apache.flume.sink.http;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/flume/sink/http/HttpSink.class */
public class HttpSink extends AbstractSink implements Configurable {
    private static final Logger LOG = Logger.getLogger(HttpSink.class);
    private static final int HTTP_STATUS_CONTINUE = 100;
    private static final int DEFAULT_CONNECT_TIMEOUT = 5000;
    private static final int DEFAULT_REQUEST_TIMEOUT = 5000;
    private static final String DEFAULT_CONTENT_TYPE = "text/plain";
    private static final String DEFAULT_ACCEPT_HEADER = "text/plain";
    private URL endpointUrl;
    private SinkCounter sinkCounter;
    private boolean defaultBackoff;
    private boolean defaultRollback;
    private boolean defaultIncrementMetrics;
    private ConnectionBuilder connectionBuilder;
    private int connectTimeout = 5000;
    private int requestTimeout = 5000;
    private String contentTypeHeader = "text/plain";
    private String acceptHeader = "text/plain";
    private HashMap<String, Boolean> backoffOverrides = new HashMap<>();
    private HashMap<String, Boolean> rollbackOverrides = new HashMap<>();
    private HashMap<String, Boolean> incrementMetricsOverrides = new HashMap<>();

    /* loaded from: input_file:org/apache/flume/sink/http/HttpSink$ConnectionBuilder.class */
    class ConnectionBuilder {
        ConnectionBuilder() {
        }

        public HttpURLConnection getConnection() throws IOException {
            HttpURLConnection httpURLConnection = (HttpURLConnection) HttpSink.this.endpointUrl.openConnection();
            httpURLConnection.setRequestMethod("POST");
            httpURLConnection.setRequestProperty("Content-Type", HttpSink.this.contentTypeHeader);
            httpURLConnection.setRequestProperty("Accept", HttpSink.this.acceptHeader);
            httpURLConnection.setConnectTimeout(HttpSink.this.connectTimeout);
            httpURLConnection.setReadTimeout(HttpSink.this.requestTimeout);
            httpURLConnection.setDoOutput(true);
            httpURLConnection.setDoInput(true);
            httpURLConnection.connect();
            return httpURLConnection;
        }
    }

    public final void configure(Context context) {
        String string = context.getString("endpoint", "");
        LOG.info("Read endpoint URL from configuration : " + string);
        try {
            this.endpointUrl = new URL(string);
            this.connectTimeout = context.getInteger("connectTimeout", 5000).intValue();
            if (this.connectTimeout <= 0) {
                throw new IllegalArgumentException("Connect timeout must be a non-zero and positive");
            }
            LOG.info("Using connect timeout : " + this.connectTimeout);
            this.requestTimeout = context.getInteger("requestTimeout", 5000).intValue();
            if (this.requestTimeout <= 0) {
                throw new IllegalArgumentException("Request timeout must be a non-zero and positive");
            }
            LOG.info("Using request timeout : " + this.requestTimeout);
            this.acceptHeader = context.getString("acceptHeader", "text/plain");
            LOG.info("Using Accept header value : " + this.acceptHeader);
            this.contentTypeHeader = context.getString("contentTypeHeader", "text/plain");
            LOG.info("Using Content-Type header value : " + this.contentTypeHeader);
            this.defaultBackoff = context.getBoolean("defaultBackoff", true).booleanValue();
            LOG.info("Channel backoff by default is " + this.defaultBackoff);
            this.defaultRollback = context.getBoolean("defaultRollback", true).booleanValue();
            LOG.info("Transaction rollback by default is " + this.defaultRollback);
            this.defaultIncrementMetrics = context.getBoolean("defaultIncrementMetrics", false).booleanValue();
            LOG.info("Incrementing metrics by default is " + this.defaultIncrementMetrics);
            parseConfigOverrides("backoff", context, this.backoffOverrides);
            parseConfigOverrides("rollback", context, this.rollbackOverrides);
            parseConfigOverrides("incrementMetrics", context, this.incrementMetricsOverrides);
            if (this.sinkCounter == null) {
                this.sinkCounter = new SinkCounter(getName());
            }
            this.connectionBuilder = new ConnectionBuilder();
        } catch (MalformedURLException e) {
            throw new IllegalArgumentException("Endpoint URL invalid", e);
        }
    }

    public final void start() {
        LOG.info("Starting HttpSink");
        this.sinkCounter.start();
    }

    public final void stop() {
        LOG.info("Stopping HttpSink");
        this.sinkCounter.stop();
    }

    public final Sink.Status process() throws EventDeliveryException {
        Sink.Status status;
        OutputStream outputStream = null;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                Event take = channel.take();
                byte[] bArr = null;
                if (take != null) {
                    bArr = take.getBody();
                }
                if (bArr == null || bArr.length <= 0) {
                    transaction.commit();
                    status = Sink.Status.BACKOFF;
                    LOG.warn("Processed empty event");
                } else {
                    this.sinkCounter.incrementEventDrainAttemptCount();
                    LOG.debug("Sending request : " + new String(take.getBody()));
                    try {
                        HttpURLConnection connection = this.connectionBuilder.getConnection();
                        outputStream = connection.getOutputStream();
                        outputStream.write(bArr);
                        outputStream.flush();
                        outputStream.close();
                        int responseCode = connection.getResponseCode();
                        LOG.debug("Got status code : " + responseCode);
                        if (responseCode < 400) {
                            connection.getInputStream().close();
                        } else {
                            LOG.debug("bad request");
                            connection.getErrorStream().close();
                        }
                        LOG.debug("Response processed and closed");
                        if (responseCode >= HTTP_STATUS_CONTINUE) {
                            String valueOf = String.valueOf(responseCode);
                            boolean findOverrideValue = findOverrideValue(valueOf, this.rollbackOverrides, this.defaultRollback);
                            if (findOverrideValue) {
                                transaction.rollback();
                            } else {
                                transaction.commit();
                            }
                            boolean findOverrideValue2 = findOverrideValue(valueOf, this.backoffOverrides, this.defaultBackoff);
                            status = findOverrideValue2 ? Sink.Status.BACKOFF : Sink.Status.READY;
                            if (findOverrideValue(valueOf, this.incrementMetricsOverrides, this.defaultIncrementMetrics)) {
                                this.sinkCounter.incrementEventDrainSuccessCount();
                            }
                            if (findOverrideValue) {
                                if (findOverrideValue2) {
                                    LOG.info(String.format("Got status code %d from HTTP server. Rolled back event and backed off.", Integer.valueOf(responseCode)));
                                } else {
                                    LOG.info(String.format("Got status code %d from HTTP server. Rolled back event for retry.", Integer.valueOf(responseCode)));
                                }
                            }
                        } else {
                            transaction.rollback();
                            status = Sink.Status.BACKOFF;
                            LOG.warn("Malformed response returned from server, retrying");
                        }
                    } catch (IOException e) {
                        transaction.rollback();
                        status = Sink.Status.BACKOFF;
                        LOG.error("Error opening connection, or request timed out", e);
                    }
                }
                transaction.close();
                if (outputStream != null) {
                    try {
                        outputStream.close();
                    } catch (IOException e2) {
                    }
                }
            } catch (Throwable th) {
                transaction.rollback();
                status = Sink.Status.BACKOFF;
                LOG.error("Error sending HTTP request, retrying", th);
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                transaction.close();
                if (0 != 0) {
                    try {
                        outputStream.close();
                    } catch (IOException e3) {
                    }
                }
            }
            return status;
        } catch (Throwable th2) {
            transaction.close();
            if (0 != 0) {
                try {
                    outputStream.close();
                } catch (IOException e4) {
                }
            }
            throw th2;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void parseConfigOverrides(String str, Context context, Map<String, Boolean> map) {
        ImmutableMap subProperties = context.getSubProperties(str + ".");
        if (subProperties != null) {
            UnmodifiableIterator it = subProperties.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                LOG.info(String.format("Read %s value for status code %s as %s", str, entry.getKey(), entry.getValue()));
                if (map.containsKey(entry.getKey())) {
                    LOG.warn(String.format("Ignoring duplicate config value for %s.%s", str, entry.getKey()));
                } else {
                    map.put(entry.getKey(), Boolean.valueOf((String) entry.getValue()));
                }
            }
        }
    }

    private boolean findOverrideValue(String str, HashMap<String, Boolean> hashMap, boolean z) {
        Boolean bool = hashMap.get(str);
        if (bool == null) {
            bool = hashMap.get(str.substring(0, 1) + "XX");
            if (bool == null) {
                bool = Boolean.valueOf(z);
            }
        }
        return bool.booleanValue();
    }

    final void setConnectionBuilder(ConnectionBuilder connectionBuilder) {
        this.connectionBuilder = connectionBuilder;
    }

    final void setSinkCounter(SinkCounter sinkCounter) {
        this.sinkCounter = sinkCounter;
    }
}
