package org.apache.flume.api;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.ServerSocket;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.avro.ipc.Server;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcTestUtils;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/api/TestNettyAvroRpcClient.class */
public class TestNettyAvroRpcClient {
    private static final Logger logger = LoggerFactory.getLogger(TestNettyAvroRpcClient.class);
    private static final String localhost = "127.0.0.1";

    @Test
    public void testOKServerSimple() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler());
    }

    @Test
    public void testOKServerSimpleCompressionLevel6() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler(), true, true, 6);
    }

    @Test
    public void testOKServerSimpleCompressionLevel0() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler(), true, true, 0);
    }

    @Test(expected = EventDeliveryException.class)
    public void testOKServerSimpleCompressionClientOnly() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler(), false, true, 6);
    }

    @Test(expected = EventDeliveryException.class)
    public void testOKServerSimpleCompressionServerOnly() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler(), true, false, 6);
    }

    @Test
    public void testOKServerBatch() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler());
    }

    @Test
    public void testOKServerBatchCompressionLevel0() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler(), true, true, 0);
    }

    @Test
    public void testOKServerBatchCompressionLevel6() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler(), true, true, 6);
    }

    @Test(expected = EventDeliveryException.class)
    public void testOKServerBatchCompressionServerOnly() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler(), true, false, 6);
    }

    @Test(expected = EventDeliveryException.class)
    public void testOKServerBatchCompressionClientOnly() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler(), false, true, 6);
    }

    @Test(expected = FlumeException.class)
    public void testUnableToConnect() throws FlumeException {
        NettyAvroRpcClient nettyAvroRpcClient = new NettyAvroRpcClient();
        Properties properties = new Properties();
        properties.setProperty("hosts", "localhost");
        properties.setProperty("hosts.localhost", "127.0.0.1:1");
        nettyAvroRpcClient.configure(properties);
    }

    @Test
    public void testBatchOverrun() throws FlumeException, EventDeliveryException {
        int i = 10 + 1;
        NettyAvroRpcClient nettyAvroRpcClient = null;
        Server startServer = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
        Properties properties = new Properties();
        properties.setProperty("hosts", "localhost");
        properties.setProperty("hosts.localhost", "127.0.0.1:" + startServer.getPort());
        properties.setProperty("batch-size", "10");
        try {
            nettyAvroRpcClient = new NettyAvroRpcClient();
            nettyAvroRpcClient.configure(properties);
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(EventBuilder.withBody("evt: " + i2, Charset.forName("UTF8")));
            }
            nettyAvroRpcClient.appendBatch(arrayList);
            RpcTestUtils.stopServer(startServer);
            if (nettyAvroRpcClient != null) {
                nettyAvroRpcClient.close();
            }
        } catch (Throwable th) {
            RpcTestUtils.stopServer(startServer);
            if (nettyAvroRpcClient != null) {
                nettyAvroRpcClient.close();
            }
            throw th;
        }
    }

    @Test(expected = EventDeliveryException.class)
    public void testServerDisconnect() throws FlumeException, EventDeliveryException, InterruptedException {
        NettyAvroRpcClient nettyAvroRpcClient = null;
        Server startServer = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
        try {
            nettyAvroRpcClient = RpcTestUtils.getStockLocalClient(startServer.getPort());
            startServer.close();
            Thread.sleep(1000L);
            try {
                startServer.join();
            } catch (InterruptedException e) {
                logger.warn("Thread interrupted during join()", e);
                Thread.currentThread().interrupt();
            }
            try {
                nettyAvroRpcClient.append(EventBuilder.withBody("hello", Charset.forName("UTF8")));
                Assert.assertFalse("Client should not be active", nettyAvroRpcClient.isActive());
                RpcTestUtils.stopServer(startServer);
                if (nettyAvroRpcClient != null) {
                    nettyAvroRpcClient.close();
                }
            } catch (Throwable th) {
                Assert.assertFalse("Client should not be active", nettyAvroRpcClient.isActive());
                throw th;
            }
        } catch (Throwable th2) {
            RpcTestUtils.stopServer(startServer);
            if (nettyAvroRpcClient != null) {
                nettyAvroRpcClient.close();
            }
            throw th2;
        }
    }

    @Test(expected = EventDeliveryException.class)
    public void testClientClosedRequest() throws FlumeException, EventDeliveryException {
        NettyAvroRpcClient nettyAvroRpcClient = null;
        Server startServer = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
        try {
            nettyAvroRpcClient = RpcTestUtils.getStockLocalClient(startServer.getPort());
            nettyAvroRpcClient.close();
            Assert.assertFalse("Client should not be active", nettyAvroRpcClient.isActive());
            System.out.println("Yaya! I am not active after client close!");
            nettyAvroRpcClient.append(EventBuilder.withBody("hello", Charset.forName("UTF8")));
            RpcTestUtils.stopServer(startServer);
            if (nettyAvroRpcClient != null) {
                nettyAvroRpcClient.close();
            }
        } catch (Throwable th) {
            RpcTestUtils.stopServer(startServer);
            if (nettyAvroRpcClient != null) {
                nettyAvroRpcClient.close();
            }
            throw th;
        }
    }

    @Test(expected = EventDeliveryException.class)
    public void testFailedServerSimple() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.FailedAvroHandler());
        logger.error("Failed: I should never have gotten here!");
    }

    @Test(expected = EventDeliveryException.class)
    public void testUnknownServerSimple() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.UnknownAvroHandler());
        logger.error("Unknown: I should never have gotten here!");
    }

    @Test(expected = EventDeliveryException.class)
    public void testThrowingServerSimple() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.ThrowingAvroHandler());
        logger.error("Throwing: I should never have gotten here!");
    }

    @Test(expected = EventDeliveryException.class)
    public void testFailedServerBatch() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.FailedAvroHandler());
        logger.error("Failed: I should never have gotten here!");
    }

    @Test(expected = EventDeliveryException.class)
    public void testUnknownServerBatch() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.UnknownAvroHandler());
        logger.error("Unknown: I should never have gotten here!");
    }

    @Test(expected = EventDeliveryException.class)
    public void testThrowingServerBatch() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.ThrowingAvroHandler());
        logger.error("Throwing: I should never have gotten here!");
    }

    @Test
    public void spinThreadsCrazily() throws IOException {
        int threadCount = ManagementFactory.getThreadMXBean().getThreadCount();
        ServerSocket serverSocket = new ServerSocket(0);
        int localPort = serverSocket.getLocalPort();
        serverSocket.close();
        Properties properties = new Properties();
        properties.put("client.type", RpcClientConfigurationConstants.DEFAULT_CLIENT_TYPE);
        properties.put("hosts", "h1");
        properties.put("hosts.h1", "localhost:" + localPort);
        properties.put("connect-timeout", "20");
        properties.put("request-timeout", "20");
        properties.put("batch-size", "1");
        for (int i = 0; i < 1000; i++) {
            RpcClient rpcClient = null;
            try {
                try {
                    rpcClient = RpcClientFactory.getDefaultInstance("localhost", Integer.valueOf(localPort));
                    rpcClient.append(EventBuilder.withBody("Hello", Charset.forName("UTF-8")));
                    if (rpcClient != null) {
                        rpcClient.close();
                    }
                } catch (EventDeliveryException e) {
                    logger.warn("Expected error", e);
                    if (rpcClient != null) {
                        rpcClient.close();
                    }
                } catch (FlumeException e2) {
                    logger.warn("Unexpected error", e2);
                    if (rpcClient != null) {
                        rpcClient.close();
                    }
                }
            } catch (Throwable th) {
                if (rpcClient != null) {
                    rpcClient.close();
                }
                throw th;
            }
        }
        int threadCount2 = ManagementFactory.getThreadMXBean().getThreadCount();
        logger.warn("Init thread count: {}, thread count: {}", Integer.valueOf(threadCount), Integer.valueOf(threadCount2));
        Assert.assertEquals("Thread leak in RPC client", threadCount, threadCount2);
    }
}
