/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import junit.framework.Assert;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.api.ThriftRpcClient;
import org.apache.flume.api.ThriftTestingSource;
import org.apache.flume.event.EventBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestThriftRpcClient {
    private static final String SEQ = "sequence";
    private final Properties props = new Properties();
    ThriftRpcClient client;
    ThriftTestingSource src;
    private final Random random = new Random();
    int port;

    @Before
    public void setUp() throws Exception {
        this.props.setProperty("hosts", "h1");
        this.port = this.random.nextInt(40000) + 1024;
        this.props.setProperty("client.type", "thrift");
        this.props.setProperty("hosts.h1", "0.0.0.0:" + String.valueOf(this.port));
        this.props.setProperty("batch-size", "10");
        this.props.setProperty("request-timeout", "2000");
        this.props.setProperty("protocol", "compact");
    }

    @After
    public void tearDown() throws Exception {
        this.src.stop();
    }

    public static void insertEvents(RpcClient client, int count) throws Exception {
        for (int i = 0; i < count; ++i) {
            HashMap<String, String> header = new HashMap<String, String>();
            header.put(SEQ, String.valueOf(i));
            client.append(EventBuilder.withBody((byte[])String.valueOf(i).getBytes(), header));
        }
    }

    public static void insertAsBatch(RpcClient client, int start, int limit) throws Exception {
        ArrayList<Event> events = new ArrayList<Event>();
        for (int i = start; i <= limit; ++i) {
            HashMap<String, String> header = new HashMap<String, String>();
            header.put(SEQ, String.valueOf(i));
            events.add(EventBuilder.withBody((byte[])String.valueOf(i).getBytes(), header));
        }
        client.appendBatch(events);
    }

    @Test
    public void testOK() throws Exception {
        this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), this.port, "compact");
        this.client = (ThriftRpcClient)RpcClientFactory.getInstance((Properties)this.props);
        TestThriftRpcClient.insertEvents((RpcClient)this.client, 10);
        TestThriftRpcClient.insertAsBatch((RpcClient)this.client, 10, 25);
        TestThriftRpcClient.insertAsBatch((RpcClient)this.client, 26, 37);
        int count = 0;
        Assert.assertEquals((int)38, (int)this.src.flumeEvents.size());
        for (Event e : this.src.flumeEvents) {
            Assert.assertEquals((String)new String(e.getBody()), (String)String.valueOf(count++));
        }
        Assert.assertEquals((int)10, (int)this.src.individualCount);
        Assert.assertEquals((int)4, (int)this.src.batchCount);
        Assert.assertEquals((int)2, (int)this.src.incompleteBatches);
    }

    @Test
    public void testSlow() throws Exception {
        this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.SLOW.name(), this.port, "compact");
        this.client = (ThriftRpcClient)RpcClientFactory.getInstance((Properties)this.props);
        TestThriftRpcClient.insertEvents((RpcClient)this.client, 2);
        TestThriftRpcClient.insertAsBatch((RpcClient)this.client, 2, 25);
        TestThriftRpcClient.insertAsBatch((RpcClient)this.client, 26, 37);
        int count = 0;
        Assert.assertEquals((int)38, (int)this.src.flumeEvents.size());
        for (Event e : this.src.flumeEvents) {
            Assert.assertEquals((String)new String(e.getBody()), (String)String.valueOf(count++));
        }
        Assert.assertEquals((int)2, (int)this.src.individualCount);
        Assert.assertEquals((int)5, (int)this.src.batchCount);
        Assert.assertEquals((int)2, (int)this.src.incompleteBatches);
    }

    @Test(expected=EventDeliveryException.class)
    public void testFail() throws Exception {
        this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.FAIL.name(), this.port, "compact");
        this.client = (ThriftRpcClient)RpcClientFactory.getInstance((Properties)this.props);
        TestThriftRpcClient.insertEvents((RpcClient)this.client, 2);
        Assert.fail((String)"Expected EventDeliveryException to be thrown.");
    }

    @Test
    public void testError() throws Throwable {
        try {
            this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ERROR.name(), this.port, "compact");
            this.client = (ThriftRpcClient)RpcClientFactory.getThriftInstance((String)"0.0.0.0", (Integer)this.port);
            TestThriftRpcClient.insertEvents((RpcClient)this.client, 2);
        }
        catch (EventDeliveryException ex) {
            Assert.assertEquals((String)"Failed to send event. ", (String)ex.getMessage());
        }
    }

    @Test(expected=TimeoutException.class)
    public void testTimeout() throws Throwable {
        try {
            this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.TIMEOUT.name(), this.port, "compact");
            this.client = (ThriftRpcClient)RpcClientFactory.getThriftInstance((Properties)this.props);
            TestThriftRpcClient.insertEvents((RpcClient)this.client, 2);
        }
        catch (EventDeliveryException ex) {
            throw ex.getCause();
        }
    }

    @Test
    public void testMultipleThreads() throws Throwable {
        int i;
        this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), this.port, "compact");
        this.client = (ThriftRpcClient)RpcClientFactory.getThriftInstance((String)"0.0.0.0", (Integer)this.port, (Integer)10);
        int threadCount = 100;
        ExecutorService submissionSvc = Executors.newFixedThreadPool(threadCount);
        ArrayList futures = new ArrayList(threadCount);
        for (i = 0; i < threadCount; ++i) {
            futures.add(submissionSvc.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        TestThriftRpcClient.insertAsBatch((RpcClient)TestThriftRpcClient.this.client, 0, 9);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }));
        }
        for (i = 0; i < threadCount; ++i) {
            ((Future)futures.get(i)).get();
        }
        ArrayList<String> events = new ArrayList<String>();
        for (Event e : this.src.flumeEvents) {
            events.add(new String(e.getBody()));
        }
        int count = 0;
        Collections.sort(events);
        int i2 = 0;
        while (i2 < events.size()) {
            for (int j = 0; j < threadCount; ++j) {
                Assert.assertEquals((String)String.valueOf(count), (String)((String)events.get(i2++)));
            }
            ++count;
        }
    }
}

