/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import junit.framework.Assert;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.NettyAvroRpcClient;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcTestUtils {
    private static final Logger logger = LoggerFactory.getLogger(RpcTestUtils.class);
    private static final String localhost = "localhost";

    public static void handlerSimpleAppendTest(AvroSourceProtocol handler) throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(handler, false, false, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void handlerSimpleAppendTest(AvroSourceProtocol handler, boolean enableServerCompression, boolean enableClientCompression, int compressionLevel) throws FlumeException, EventDeliveryException {
        NettyAvroRpcClient client = null;
        Server server = RpcTestUtils.startServer(handler, 0, enableServerCompression);
        try {
            Properties starterProp = new Properties();
            if (enableClientCompression) {
                starterProp.setProperty("compression-type", "deflate");
                starterProp.setProperty("compression-level", "" + compressionLevel);
            } else {
                starterProp.setProperty("compression-type", "none");
            }
            client = RpcTestUtils.getStockLocalClient(server.getPort(), starterProp);
            boolean isActive = client.isActive();
            Assert.assertTrue((String)"Client should be active", (boolean)isActive);
            client.append(EventBuilder.withBody((String)"wheee!!!", (Charset)Charset.forName("UTF8")));
        }
        finally {
            RpcTestUtils.stopServer(server);
            if (client != null) {
                client.close();
            }
        }
    }

    public static void handlerBatchAppendTest(AvroSourceProtocol handler) throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(handler, false, false, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void handlerBatchAppendTest(AvroSourceProtocol handler, boolean enableServerCompression, boolean enableClientCompression, int compressionLevel) throws FlumeException, EventDeliveryException {
        NettyAvroRpcClient client = null;
        Server server = RpcTestUtils.startServer(handler, 0, enableServerCompression);
        try {
            Properties starterProp = new Properties();
            if (enableClientCompression) {
                starterProp.setProperty("compression-type", "deflate");
                starterProp.setProperty("compression-level", "" + compressionLevel);
            } else {
                starterProp.setProperty("compression-type", "none");
            }
            client = RpcTestUtils.getStockLocalClient(server.getPort(), starterProp);
            boolean isActive = client.isActive();
            Assert.assertTrue((String)"Client should be active", (boolean)isActive);
            int batchSize = client.getBatchSize();
            ArrayList<Event> events = new ArrayList<Event>();
            for (int i = 0; i < batchSize; ++i) {
                events.add(EventBuilder.withBody((String)("evt: " + i), (Charset)Charset.forName("UTF8")));
            }
            client.appendBatch(events);
        }
        finally {
            RpcTestUtils.stopServer(server);
            if (client != null) {
                client.close();
            }
        }
    }

    public static NettyAvroRpcClient getStockLocalClient(int port) {
        Properties props = new Properties();
        return RpcTestUtils.getStockLocalClient(port, props);
    }

    public static NettyAvroRpcClient getStockLocalClient(int port, Properties starterProp) {
        starterProp.setProperty("hosts", "h1");
        starterProp.setProperty("hosts.h1", "127.0.0.1:" + port);
        NettyAvroRpcClient client = new NettyAvroRpcClient();
        client.configure(starterProp);
        return client;
    }

    public static Server startServer(AvroSourceProtocol handler, int port, boolean enableCompression) {
        SpecificResponder responder = new SpecificResponder(AvroSourceProtocol.class, (Object)handler);
        NettyServer server = enableCompression ? new NettyServer((Responder)responder, new InetSocketAddress(localhost, port), (ChannelFactory)new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newCachedThreadPool()), (ChannelPipelineFactory)new CompressionChannelPipelineFactory(), null) : new NettyServer((Responder)responder, new InetSocketAddress(localhost, port));
        server.start();
        logger.info("Server started on hostname: {}, port: {}", new Object[]{localhost, Integer.toString(server.getPort())});
        try {
            Thread.sleep(300L);
        }
        catch (InterruptedException ex) {
            logger.error("Thread interrupted. Exception follows.", (Throwable)ex);
            Thread.currentThread().interrupt();
        }
        return server;
    }

    public static Server startServer(AvroSourceProtocol handler) {
        return RpcTestUtils.startServer(handler, 0, false);
    }

    public static Server startServer(AvroSourceProtocol handler, int port) {
        return RpcTestUtils.startServer(handler, port, false);
    }

    public static void stopServer(Server server) {
        try {
            server.close();
            server.join();
        }
        catch (InterruptedException ex) {
            logger.error("Thread interrupted. Exception follows.", (Throwable)ex);
            Thread.currentThread().interrupt();
        }
    }

    private static class CompressionChannelPipelineFactory
    implements ChannelPipelineFactory {
        private CompressionChannelPipelineFactory() {
        }

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            ZlibEncoder encoder = new ZlibEncoder(6);
            pipeline.addFirst("deflater", (ChannelHandler)encoder);
            pipeline.addFirst("inflater", (ChannelHandler)new ZlibDecoder());
            return pipeline;
        }
    }

    public static class ThrowingAvroHandler
    implements AvroSourceProtocol {
        public Status append(AvroFlumeEvent event) throws AvroRemoteException {
            logger.info("Throwing: Received event from append(): {}", (Object)new String(event.getBody().array(), Charset.forName("UTF8")));
            throw new AvroRemoteException((Object)"Handler smash!");
        }

        public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException {
            logger.info("Throwing: Received {} events from appendBatch()", (Object)events.size());
            throw new AvroRemoteException((Object)"Handler smash!");
        }
    }

    public static class UnknownAvroHandler
    implements AvroSourceProtocol {
        public Status append(AvroFlumeEvent event) throws AvroRemoteException {
            logger.info("Unknown: Received event from append(): {}", (Object)new String(event.getBody().array(), Charset.forName("UTF8")));
            return Status.UNKNOWN;
        }

        public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException {
            logger.info("Unknown: Received {} events from appendBatch()", (Object)events.size());
            return Status.UNKNOWN;
        }
    }

    public static class FailedAvroHandler
    implements AvroSourceProtocol {
        public Status append(AvroFlumeEvent event) throws AvroRemoteException {
            logger.info("Failed: Received event from append(): {}", (Object)new String(event.getBody().array(), Charset.forName("UTF8")));
            return Status.FAILED;
        }

        public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException {
            logger.info("Failed: Received {} events from appendBatch()", (Object)events.size());
            return Status.FAILED;
        }
    }

    public static class OKAvroHandler
    implements AvroSourceProtocol {
        public Status append(AvroFlumeEvent event) throws AvroRemoteException {
            logger.info("OK: Received event from append(): {}", (Object)new String(event.getBody().array(), Charset.forName("UTF8")));
            return Status.OK;
        }

        public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException {
            logger.info("OK: Received {} events from appendBatch()", (Object)events.size());
            return Status.OK;
        }
    }

    public static class LoadBalancedAvroHandler
    implements AvroSourceProtocol {
        private int appendCount = 0;
        private int appendBatchCount = 0;
        private boolean failed = false;

        public int getAppendCount() {
            return this.appendCount;
        }

        public int getAppendBatchCount() {
            return this.appendBatchCount;
        }

        public boolean isFailed() {
            return this.failed;
        }

        public void setFailed() {
            this.failed = true;
        }

        public void setOK() {
            this.failed = false;
        }

        public Status append(AvroFlumeEvent event) throws AvroRemoteException {
            if (this.failed) {
                logger.debug("Event rejected");
                return Status.FAILED;
            }
            logger.debug("LB: Received event from append(): {}", (Object)new String(event.getBody().array(), Charset.forName("UTF8")));
            ++this.appendCount;
            return Status.OK;
        }

        public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException {
            if (this.failed) {
                logger.debug("Event batch rejected");
                return Status.FAILED;
            }
            logger.debug("LB: Received {} events from appendBatch()", (Object)events.size());
            ++this.appendBatchCount;
            return Status.OK;
        }
    }
}

