package org.apache.hive.druid.io.druid.java.util.emitter.core;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hive.druid.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Charsets;
import org.apache.hive.druid.com.google.common.io.BaseEncoding;
import org.apache.hive.druid.io.druid.client.CachingClusteredClientTest;
import org.apache.hive.druid.io.druid.java.util.common.CompressionUtils;
import org.apache.hive.druid.io.druid.java.util.common.StringUtils;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.hive.druid.io.druid.java.util.emitter.core.HttpEmitterConfig;
import org.apache.hive.druid.io.druid.java.util.emitter.service.UnitEvent;
import org.apache.hive.druid.io.netty.buffer.Unpooled;
import org.apache.hive.druid.io.netty.channel.Channel;
import org.apache.hive.druid.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.hive.druid.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.hive.druid.io.netty.handler.codec.http.HttpVersion;
import org.apache.hive.druid.org.apache.calcite.sql.parser.parserextensiontesting.ExtensionSqlParserImplConstants;
import org.apache.tools.ant.taskdefs.Execute;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.netty.EagerResponseBodyPart;
import org.asynchttpclient.netty.NettyResponseStatus;
import org.asynchttpclient.uri.Uri;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hive/druid/io/druid/java/util/emitter/core/EmitterTest.class */
public class EmitterTest {
    private static final ObjectMapper jsonMapper = new ObjectMapper();
    public static String TARGET_URL = "http://metrics.foo.bar/";
    public static final Response OK_RESPONSE = responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED).accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Yay".getBytes(StandardCharsets.UTF_8)), true)).build();
    MockHttpClient httpClient;
    HttpPostEmitter emitter;

    /* JADX INFO: Access modifiers changed from: private */
    public static Response.ResponseBuilder responseBuilder(HttpVersion httpVersion, HttpResponseStatus httpResponseStatus) {
        return new Response.ResponseBuilder().accumulate(new NettyResponseStatus(Uri.create(TARGET_URL), new DefaultAsyncHttpClientConfig.Builder().build(), new DefaultHttpResponse(httpVersion, httpResponseStatus), (Channel) null));
    }

    public static Response okResponse() {
        return OK_RESPONSE;
    }

    @Before
    public void setUp() throws Exception {
        this.httpClient = new MockHttpClient();
    }

    @After
    public void tearDown() throws Exception {
        if (this.emitter != null) {
            this.emitter.close();
        }
    }

    private HttpPostEmitter timeBasedEmitter(long j) {
        HttpPostEmitter httpPostEmitter = new HttpPostEmitter(new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(j).setFlushCount(Execute.INVALID).build(), this.httpClient, jsonMapper);
        httpPostEmitter.start();
        return httpPostEmitter;
    }

    private HttpPostEmitter sizeBasedEmitter(int i) {
        HttpPostEmitter httpPostEmitter = new HttpPostEmitter(new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(Long.MAX_VALUE).setFlushCount(i).build(), this.httpClient, jsonMapper);
        httpPostEmitter.start();
        return httpPostEmitter;
    }

    private HttpPostEmitter sizeBasedEmitterGeneralizedCreation(int i) {
        Properties properties = new Properties();
        properties.setProperty("org.apache.hive.druid.io.druid.java.util.emitter.type", "http");
        properties.setProperty("org.apache.hive.druid.io.druid.java.util.emitter.recipientBaseUrl", TARGET_URL);
        properties.setProperty("org.apache.hive.druid.io.druid.java.util.emitter.flushMillis", String.valueOf(Long.MAX_VALUE));
        properties.setProperty("org.apache.hive.druid.io.druid.java.util.emitter.flushCount", String.valueOf(i));
        HttpPostEmitter create = Emitters.create(properties, this.httpClient, jsonMapper, new Lifecycle());
        Assert.assertTrue(StringUtils.format("HttpPostEmitter emitter should be created, but found %s", new Object[]{create.getClass().getName()}), create instanceof HttpPostEmitter);
        create.start();
        return create;
    }

    private HttpPostEmitter sizeBasedEmitterWithContentEncoding(int i, ContentEncoding contentEncoding) {
        HttpPostEmitter httpPostEmitter = new HttpPostEmitter(new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(Long.MAX_VALUE).setFlushCount(i).setContentEncoding(contentEncoding).build(), this.httpClient, jsonMapper);
        httpPostEmitter.start();
        return httpPostEmitter;
    }

    private HttpPostEmitter manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating(String str) {
        HttpPostEmitter httpPostEmitter = new HttpPostEmitter(new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(Long.MAX_VALUE).setFlushCount(Execute.INVALID).setBasicAuthentication(str).setBatchingStrategy(BatchingStrategy.NEWLINES).setMaxBatchSize(1048576).build(), this.httpClient, jsonMapper);
        httpPostEmitter.start();
        return httpPostEmitter;
    }

    private HttpPostEmitter manualFlushEmitterWithBatchSizeAndBufferSize(int i, long j) {
        HttpPostEmitter httpPostEmitter = new HttpPostEmitter(new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(Long.MAX_VALUE).setFlushCount(Execute.INVALID).setMaxBatchSize(i).build(), this.httpClient, jsonMapper);
        httpPostEmitter.start();
        return httpPostEmitter;
    }

    @Test
    public void testSanity() throws Exception {
        final List asList = Arrays.asList(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 1), new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 2));
        this.emitter = sizeBasedEmitter(2);
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.1
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) throws JsonProcessingException {
                Assert.assertEquals(EmitterTest.TARGET_URL, request.getUrl());
                Assert.assertEquals("application/json", request.getHeaders().get("Content-Type"));
                Assert.assertEquals(StringUtils.format("[%s,%s]\n", new Object[]{EmitterTest.jsonMapper.writeValueAsString(asList.get(0)), EmitterTest.jsonMapper.writeValueAsString(asList.get(1))}), Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString());
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            this.emitter.emit((UnitEvent) it.next());
        }
        waitForEmission(this.emitter, 0);
        closeNoFlush(this.emitter);
        Assert.assertTrue(this.httpClient.succeeded());
    }

    @Test
    public void testSanityWithGeneralizedCreation() throws Exception {
        final List asList = Arrays.asList(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 1), new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 2));
        this.emitter = sizeBasedEmitterGeneralizedCreation(2);
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.2
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) throws JsonProcessingException {
                Assert.assertEquals(EmitterTest.TARGET_URL, request.getUrl());
                Assert.assertEquals("application/json", request.getHeaders().get("Content-Type"));
                Assert.assertEquals(StringUtils.format("[%s,%s]\n", new Object[]{EmitterTest.jsonMapper.writeValueAsString(asList.get(0)), EmitterTest.jsonMapper.writeValueAsString(asList.get(1))}), Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString());
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            this.emitter.emit((UnitEvent) it.next());
        }
        waitForEmission(this.emitter, 0);
        closeNoFlush(this.emitter);
        Assert.assertTrue(this.httpClient.succeeded());
    }

    @Test
    public void testSizeBasedEmission() throws Exception {
        this.emitter = sizeBasedEmitter(3);
        this.httpClient.setGoHandler(GoHandlers.failingHandler());
        this.emitter.emit(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 1));
        this.emitter.emit(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 2));
        this.httpClient.setGoHandler(GoHandlers.passingHandler(okResponse()).times(1));
        this.emitter.emit(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 3));
        waitForEmission(this.emitter, 0);
        this.httpClient.setGoHandler(GoHandlers.failingHandler());
        this.emitter.emit(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 4));
        this.emitter.emit(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 5));
        closeAndExpectFlush(this.emitter);
        Assert.assertTrue(this.httpClient.succeeded());
    }

    @Test
    public void testTimeBasedEmission() throws Exception {
        this.emitter = timeBasedEmitter(100L);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.3
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) {
                countDownLatch.countDown();
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        long currentTimeMillis = System.currentTimeMillis();
        this.emitter.emit(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 1));
        countDownLatch.await();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue(StringUtils.format("timeWaited[%s] !< %s", new Object[]{Long.valueOf(currentTimeMillis2), Integer.valueOf(ExtensionSqlParserImplConstants.FOLLOWING)}), currentTimeMillis2 < 200);
        waitForEmission(this.emitter, 0);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.4
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) {
                countDownLatch2.countDown();
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        long currentTimeMillis3 = System.currentTimeMillis();
        this.emitter.emit(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 2));
        countDownLatch2.await();
        long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
        Assert.assertTrue(StringUtils.format("timeWaited[%s] !< %s", new Object[]{Long.valueOf(currentTimeMillis4), Integer.valueOf(ExtensionSqlParserImplConstants.FOLLOWING)}), currentTimeMillis4 < 200);
        waitForEmission(this.emitter, 1);
        closeNoFlush(this.emitter);
        Assert.assertTrue("httpClient.succeeded()", this.httpClient.succeeded());
    }

    @Test(timeout = 60000)
    public void testFailedEmission() throws Exception {
        UnitEvent unitEvent = new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 1);
        UnitEvent unitEvent2 = new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 2);
        this.emitter = sizeBasedEmitter(1);
        Assert.assertEquals(0L, this.emitter.getTotalEmittedEvents());
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.5
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) {
                return GoHandlers.immediateFuture(EmitterTest.responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST).build());
            }
        });
        this.emitter.emit(unitEvent);
        this.emitter.flush();
        waitForEmission(this.emitter, 0);
        Assert.assertTrue(this.httpClient.succeeded());
        Assert.assertEquals(0L, this.emitter.getTotalEmittedEvents());
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.6
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) {
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(2));
        this.emitter.emit(unitEvent2);
        this.emitter.flush();
        waitForEmission(this.emitter, 1);
        closeNoFlush(this.emitter);
        this.emitter.joinEmitterThread();
        Assert.assertEquals(2L, this.emitter.getTotalEmittedEvents());
        Assert.assertTrue(this.httpClient.succeeded());
    }

    @Test
    public void testBasicAuthenticationAndNewlineSeparating() throws Exception {
        final List asList = Arrays.asList(new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 1), new UnitEvent(CachingClusteredClientTest.DATA_SOURCE, 2));
        this.emitter = manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating("foo:bar");
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.7
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) throws JsonProcessingException {
                Assert.assertEquals(EmitterTest.TARGET_URL, request.getUrl());
                Assert.assertEquals("application/json", request.getHeaders().get("Content-Type"));
                Assert.assertEquals("Basic " + BaseEncoding.base64().encode(StringUtils.toUtf8("foo:bar")), request.getHeaders().get("Authorization"));
                Assert.assertEquals(StringUtils.format("%s\n%s\n", new Object[]{EmitterTest.jsonMapper.writeValueAsString(asList.get(0)), EmitterTest.jsonMapper.writeValueAsString(asList.get(1))}), Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString());
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            this.emitter.emit((UnitEvent) it.next());
        }
        this.emitter.flush();
        waitForEmission(this.emitter, 0);
        closeNoFlush(this.emitter);
        Assert.assertTrue(this.httpClient.succeeded());
    }

    @Test
    public void testBatchSplitting() throws Exception {
        byte[] bArr = new byte[512000];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = 120;
        }
        String fromUtf8 = StringUtils.fromUtf8(bArr);
        final List asList = Arrays.asList(new UnitEvent(fromUtf8, 1), new UnitEvent(fromUtf8, 2), new UnitEvent(fromUtf8, 3), new UnitEvent(fromUtf8, 4));
        final AtomicInteger atomicInteger = new AtomicInteger();
        this.emitter = manualFlushEmitterWithBatchSizeAndBufferSize(1048576, 5242880L);
        Assert.assertEquals(0L, this.emitter.getTotalEmittedEvents());
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.8
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) throws JsonProcessingException {
                Assert.assertEquals(EmitterTest.TARGET_URL, request.getUrl());
                Assert.assertEquals("application/json", request.getHeaders().get("Content-Type"));
                Assert.assertEquals(StringUtils.format("[%s,%s]\n", new Object[]{EmitterTest.jsonMapper.writeValueAsString(asList.get(atomicInteger.getAndIncrement())), EmitterTest.jsonMapper.writeValueAsString(asList.get(atomicInteger.getAndIncrement()))}), Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString());
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(3));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            this.emitter.emit((UnitEvent) it.next());
        }
        waitForEmission(this.emitter, 0);
        Assert.assertEquals(2L, this.emitter.getTotalEmittedEvents());
        this.emitter.flush();
        waitForEmission(this.emitter, 1);
        Assert.assertEquals(4L, this.emitter.getTotalEmittedEvents());
        closeNoFlush(this.emitter);
        Assert.assertTrue(this.httpClient.succeeded());
    }

    @Test
    public void testGzipContentEncoding() throws Exception {
        final List asList = Arrays.asList(new UnitEvent("plain-text", 1), new UnitEvent("plain-text", 2));
        this.emitter = sizeBasedEmitterWithContentEncoding(2, ContentEncoding.GZIP);
        this.httpClient.setGoHandler(new GoHandler() { // from class: org.apache.hive.druid.io.druid.java.util.emitter.core.EmitterTest.9
            @Override // org.apache.hive.druid.io.druid.java.util.emitter.core.GoHandler
            protected ListenableFuture<Response> go(Request request) throws IOException {
                Assert.assertEquals(EmitterTest.TARGET_URL, request.getUrl());
                Assert.assertEquals("application/json", request.getHeaders().get("Content-Type"));
                Assert.assertEquals("gzip", request.getHeaders().get("Content-Encoding"));
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                ByteBuffer slice = request.getByteBufferData().slice();
                byte[] bArr = new byte[slice.remaining()];
                slice.get(bArr);
                CompressionUtils.gunzip(new ByteArrayInputStream(bArr), byteArrayOutputStream);
                Assert.assertEquals(StringUtils.format("[%s,%s]\n", new Object[]{EmitterTest.jsonMapper.writeValueAsString(asList.get(0)), EmitterTest.jsonMapper.writeValueAsString(asList.get(1))}), byteArrayOutputStream.toString(Charsets.UTF_8.name()));
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            this.emitter.emit((UnitEvent) it.next());
        }
        waitForEmission(this.emitter, 0);
        closeNoFlush(this.emitter);
        Assert.assertTrue(this.httpClient.succeeded());
    }

    private void closeAndExpectFlush(Emitter emitter) throws IOException {
        this.httpClient.setGoHandler(GoHandlers.passingHandler(okResponse()).times(1));
        emitter.close();
    }

    private void closeNoFlush(Emitter emitter) throws IOException {
        emitter.close();
    }

    private void waitForEmission(HttpPostEmitter httpPostEmitter, int i) throws Exception {
        httpPostEmitter.waitForEmission(i);
    }
}
