package org.apache.flume.sink.elasticsearch.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.class */
public class ElasticSearchRestClient implements ElasticSearchClient {
    private static final String INDEX_OPERATION_NAME = "index";
    private static final String INDEX_PARAM = "_index";
    private static final String TYPE_PARAM = "_type";
    private static final String TTL_PARAM = "_ttl";
    private static final String BULK_ENDPOINT = "_bulk";
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class);
    private final ElasticSearchEventSerializer serializer;
    private final RoundRobinList<String> serversList;
    private StringBuilder bulkBuilder;
    private HttpClient httpClient;

    public ElasticSearchRestClient(String[] strArr, ElasticSearchEventSerializer elasticSearchEventSerializer) {
        for (int i = 0; i < strArr.length; i++) {
            if (!strArr[i].contains("http://") && !strArr[i].contains("https://")) {
                strArr[i] = "http://" + strArr[i];
            }
        }
        this.serializer = elasticSearchEventSerializer;
        this.serversList = new RoundRobinList<>(Arrays.asList(strArr));
        this.httpClient = new DefaultHttpClient();
        this.bulkBuilder = new StringBuilder();
    }

    @VisibleForTesting
    public ElasticSearchRestClient(String[] strArr, ElasticSearchEventSerializer elasticSearchEventSerializer, HttpClient httpClient) {
        this(strArr, elasticSearchEventSerializer);
        this.httpClient = httpClient;
    }

    public void configure(Context context) {
    }

    @Override // org.apache.flume.sink.elasticsearch.client.ElasticSearchClient
    public void close() {
    }

    @Override // org.apache.flume.sink.elasticsearch.client.ElasticSearchClient
    public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String str, long j) throws Exception {
        BytesReference bytes = this.serializer.mo1getContentBuilder(event).bytes();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(INDEX_PARAM, indexNameBuilder.getIndexName(event));
        hashMap2.put(TYPE_PARAM, str);
        if (j > 0) {
            hashMap2.put(TTL_PARAM, Long.toString(j));
        }
        hashMap.put(INDEX_OPERATION_NAME, hashMap2);
        Gson gson = new Gson();
        synchronized (this.bulkBuilder) {
            this.bulkBuilder.append(gson.toJson(hashMap));
            this.bulkBuilder.append("\n");
            this.bulkBuilder.append(bytes.toBytesArray().toUtf8());
            this.bulkBuilder.append("\n");
        }
    }

    @Override // org.apache.flume.sink.elasticsearch.client.ElasticSearchClient
    public void execute() throws Exception {
        String sb;
        int i = 0;
        int i2 = 0;
        HttpResponse httpResponse = null;
        logger.info("Sending bulk request to elasticsearch cluster");
        synchronized (this.bulkBuilder) {
            sb = this.bulkBuilder.toString();
            this.bulkBuilder = new StringBuilder();
        }
        while (i != 200 && i2 < this.serversList.size()) {
            i2++;
            HttpPost httpPost = new HttpPost(this.serversList.get() + "/" + BULK_ENDPOINT);
            httpPost.setEntity(new StringEntity(sb));
            httpResponse = this.httpClient.execute(httpPost);
            i = httpResponse.getStatusLine().getStatusCode();
            logger.info("Status code from elasticsearch: " + i);
            if (httpResponse.getEntity() != null) {
                logger.debug("Status message from elasticsearch: " + EntityUtils.toString(httpResponse.getEntity(), "UTF-8"));
            }
        }
        if (i != 200) {
            if (httpResponse.getEntity() == null) {
                throw new EventDeliveryException("Elasticsearch status code was: " + i);
            }
            throw new EventDeliveryException(EntityUtils.toString(httpResponse.getEntity(), "UTF-8"));
        }
    }
}
