/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;

public class ThriftTestingSource {
    public final Queue<Event> flumeEvents = new ConcurrentLinkedQueue<Event>();
    private final TServer server;
    public int batchCount = 0;
    public int individualCount = 0;
    public int incompleteBatches = 0;
    private AtomicLong delay = null;

    public void setDelay(AtomicLong delay) {
        this.delay = delay;
    }

    public ThriftTestingSource(String handlerName, int port) throws Exception {
        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port));
        Object handler = null;
        if (handlerName.equals(HandlerType.OK.name())) {
            handler = new ThriftOKHandler();
        } else if (handlerName.equals(HandlerType.FAIL.name())) {
            handler = new ThriftFailHandler();
        } else if (handlerName.equals(HandlerType.ERROR.name())) {
            handler = new ThriftErrorHandler();
        } else if (handlerName.equals(HandlerType.SLOW.name())) {
            handler = new ThriftSlowHandler();
        } else if (handlerName.equals(HandlerType.TIMEOUT.name())) {
            handler = new ThriftTimeoutHandler();
        } else if (handlerName.equals(HandlerType.ALTERNATE.name())) {
            handler = new ThriftAlternateHandler();
        }
        this.server = new THsHaServer((THsHaServer.Args)((THsHaServer.Args)new THsHaServer.Args((TNonblockingServerTransport)serverTransport).processor((TProcessor)new ThriftSourceProtocol.Processor((ThriftSourceProtocol.Iface)handler))).protocolFactory((TProtocolFactory)new TCompactProtocol.Factory()));
        Executors.newSingleThreadExecutor().submit(new Runnable(){

            @Override
            public void run() {
                ThriftTestingSource.this.server.serve();
            }
        });
    }

    public void stop() {
        this.server.stop();
    }

    public static enum HandlerType {
        OK,
        FAIL,
        ERROR,
        SLOW,
        TIMEOUT,
        ALTERNATE;

    }

    private class ThriftAlternateHandler
    extends ThriftOKHandler {
        private ThriftAlternateHandler() {
        }

        @Override
        public Status append(ThriftFlumeEvent event) throws TException {
            try {
                if (ThriftTestingSource.this.delay != null) {
                    TimeUnit.MILLISECONDS.sleep(ThriftTestingSource.this.delay.get());
                }
            }
            catch (InterruptedException e) {
                throw new FlumeException("Error", (Throwable)e);
            }
            return super.append(event);
        }

        @Override
        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            try {
                if (ThriftTestingSource.this.delay != null) {
                    TimeUnit.MILLISECONDS.sleep(ThriftTestingSource.this.delay.get());
                }
            }
            catch (InterruptedException e) {
                throw new FlumeException("Error", (Throwable)e);
            }
            return super.appendBatch(events);
        }
    }

    private class ThriftTimeoutHandler
    extends ThriftOKHandler {
        private ThriftTimeoutHandler() {
        }

        @Override
        public Status append(ThriftFlumeEvent event) throws TException {
            try {
                TimeUnit.MILLISECONDS.sleep(5000L);
            }
            catch (InterruptedException e) {
                throw new FlumeException("Error", (Throwable)e);
            }
            return super.append(event);
        }

        @Override
        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            try {
                TimeUnit.MILLISECONDS.sleep(5000L);
            }
            catch (InterruptedException e) {
                throw new FlumeException("Error", (Throwable)e);
            }
            return super.appendBatch(events);
        }
    }

    private class ThriftSlowHandler
    extends ThriftOKHandler {
        private ThriftSlowHandler() {
        }

        @Override
        public Status append(ThriftFlumeEvent event) throws TException {
            try {
                TimeUnit.MILLISECONDS.sleep(1550L);
            }
            catch (InterruptedException e) {
                throw new FlumeException("Error", (Throwable)e);
            }
            return super.append(event);
        }

        @Override
        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            try {
                TimeUnit.MILLISECONDS.sleep(1550L);
            }
            catch (InterruptedException e) {
                throw new FlumeException("Error", (Throwable)e);
            }
            return super.appendBatch(events);
        }
    }

    private class ThriftErrorHandler
    implements ThriftSourceProtocol.Iface {
        private ThriftErrorHandler() {
        }

        public Status append(ThriftFlumeEvent event) throws TException {
            throw new FlumeException("Forced Error");
        }

        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            throw new FlumeException("Forced Error");
        }
    }

    private class ThriftFailHandler
    implements ThriftSourceProtocol.Iface {
        private ThriftFailHandler() {
        }

        public Status append(ThriftFlumeEvent event) throws TException {
            return Status.FAILED;
        }

        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            return Status.FAILED;
        }
    }

    private class ThriftOKHandler
    implements ThriftSourceProtocol.Iface {
        public Status append(ThriftFlumeEvent event) throws TException {
            ThriftTestingSource.this.flumeEvents.add(EventBuilder.withBody((byte[])event.getBody(), (Map)event.getHeaders()));
            ++ThriftTestingSource.this.individualCount;
            return Status.OK;
        }

        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            ++ThriftTestingSource.this.batchCount;
            if (events.size() < 10) {
                ++ThriftTestingSource.this.incompleteBatches;
            }
            for (ThriftFlumeEvent event : events) {
                ThriftTestingSource.this.flumeEvents.add(EventBuilder.withBody((byte[])event.getBody(), (Map)event.getHeaders()));
            }
            return Status.OK;
        }
    }
}

