package org.apache.flume.source;

import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.Source;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/NetcatSource.class */
public class NetcatSource extends AbstractSource implements Configurable, EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(NetcatSource.class);
    private String hostName;
    private int maxLineLength;
    private boolean ackEveryEvent;
    private String sourceEncoding;
    private ServerSocketChannel serverSocket;
    private Thread acceptThread;
    private ExecutorService handlerService;
    private int port = 0;
    private CounterGroup counterGroup = new CounterGroup();
    private AtomicBoolean acceptThreadShouldStop = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/flume/source/NetcatSource$AcceptHandler.class */
    private static class AcceptHandler implements Runnable {
        private ServerSocketChannel serverSocket;
        private CounterGroup counterGroup;
        private ExecutorService handlerService;
        private EventDrivenSource source;
        private AtomicBoolean shouldStop;
        private boolean ackEveryEvent;
        private String sourceEncoding;
        private final int maxLineLength;

        public AcceptHandler(int i) {
            this.maxLineLength = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            NetcatSource.logger.debug("Starting accept handler");
            while (!this.shouldStop.get()) {
                try {
                    SocketChannel accept = this.serverSocket.accept();
                    NetcatSocketHandler netcatSocketHandler = new NetcatSocketHandler(this.maxLineLength);
                    netcatSocketHandler.socketChannel = accept;
                    netcatSocketHandler.counterGroup = this.counterGroup;
                    netcatSocketHandler.source = this.source;
                    netcatSocketHandler.ackEveryEvent = this.ackEveryEvent;
                    netcatSocketHandler.sourceEncoding = this.sourceEncoding;
                    this.handlerService.submit(netcatSocketHandler);
                    this.counterGroup.incrementAndGet("accept.succeeded");
                } catch (ClosedByInterruptException e) {
                } catch (IOException e2) {
                    NetcatSource.logger.error("Unable to accept connection. Exception follows.", e2);
                    this.counterGroup.incrementAndGet("accept.failed");
                }
            }
            NetcatSource.logger.debug("Accept handler exiting");
        }
    }

    /* loaded from: input_file:org/apache/flume/source/NetcatSource$NetcatSocketHandler.class */
    private static class NetcatSocketHandler implements Runnable {
        private Source source;
        private CounterGroup counterGroup;
        private SocketChannel socketChannel;
        private boolean ackEveryEvent;
        private String sourceEncoding;
        private final int maxLineLength;

        public NetcatSocketHandler(int i) {
            this.maxLineLength = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            NetcatSource.logger.debug("Starting connection handler");
            try {
                Reader newReader = Channels.newReader(this.socketChannel, this.sourceEncoding);
                Writer newWriter = Channels.newWriter(this.socketChannel, this.sourceEncoding);
                CharBuffer allocate = CharBuffer.allocate(this.maxLineLength);
                allocate.flip();
                while (true) {
                    int fill = fill(allocate, newReader);
                    NetcatSource.logger.debug("Chars read = {}", Integer.valueOf(fill));
                    int processEvents = processEvents(allocate, newWriter);
                    NetcatSource.logger.debug("Events processed = {}", Integer.valueOf(processEvents));
                    if (fill == -1) {
                        break;
                    }
                    if (fill == 0 && processEvents == 0 && allocate.remaining() == allocate.capacity()) {
                        NetcatSource.logger.warn("Client sent event exceeding the maximum length");
                        this.counterGroup.incrementAndGet("events.failed");
                        newWriter.write("FAILED: Event exceeds the maximum length (" + allocate.capacity() + " chars, including newline)\n");
                        newWriter.flush();
                        break;
                    }
                }
                this.socketChannel.close();
                this.counterGroup.incrementAndGet("sessions.completed");
            } catch (IOException e) {
                this.counterGroup.incrementAndGet("sessions.broken");
                try {
                    this.socketChannel.close();
                } catch (IOException e2) {
                    NetcatSource.logger.error("Unable to close socket channel. Exception follows.", e2);
                }
            }
            NetcatSource.logger.debug("Connection handler exiting");
        }

        private int processEvents(CharBuffer charBuffer, Writer writer) throws IOException {
            int i = 0;
            boolean z = true;
            while (z) {
                z = false;
                int limit = charBuffer.limit();
                int position = charBuffer.position();
                while (true) {
                    if (position >= limit) {
                        break;
                    }
                    if (charBuffer.get(position) == '\n') {
                        charBuffer.limit(position);
                        ByteBuffer encode = Charsets.UTF_8.encode(charBuffer);
                        charBuffer.limit(limit);
                        byte[] bArr = new byte[encode.remaining()];
                        encode.get(bArr);
                        ChannelException channelException = null;
                        try {
                            this.source.getChannelProcessor().processEvent(EventBuilder.withBody(bArr));
                        } catch (ChannelException e) {
                            channelException = e;
                        }
                        if (channelException == null) {
                            this.counterGroup.incrementAndGet("events.processed");
                            i++;
                            if (true == this.ackEveryEvent) {
                                writer.write("OK\n");
                            }
                        } else {
                            this.counterGroup.incrementAndGet("events.failed");
                            NetcatSource.logger.warn("Error processing event. Exception follows.", channelException);
                            writer.write("FAILED: " + channelException.getMessage() + "\n");
                        }
                        writer.flush();
                        charBuffer.position(position + 1);
                        z = true;
                    } else {
                        position++;
                    }
                }
            }
            return i;
        }

        private int fill(CharBuffer charBuffer, Reader reader) throws IOException {
            charBuffer.compact();
            int read = reader.read(charBuffer);
            this.counterGroup.addAndGet("characters.received", Long.valueOf(read));
            charBuffer.flip();
            return read;
        }
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(context, "bind", "port");
        this.hostName = context.getString("bind");
        this.port = context.getInteger("port").intValue();
        this.ackEveryEvent = context.getBoolean(NetcatSourceConfigurationConstants.CONFIG_ACKEVENT, true).booleanValue();
        this.maxLineLength = context.getInteger(NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH, Integer.valueOf(NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH)).intValue();
        this.sourceEncoding = context.getString(NetcatSourceConfigurationConstants.CONFIG_SOURCE_ENCODING, NetcatSourceConfigurationConstants.DEFAULT_ENCODING);
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Source starting");
        this.counterGroup.incrementAndGet("open.attempts");
        this.handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("netcat-handler-%d").build());
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(this.hostName, this.port);
            this.serverSocket = ServerSocketChannel.open();
            this.serverSocket.socket().setReuseAddress(true);
            this.serverSocket.socket().bind(inetSocketAddress);
            logger.info("Created serverSocket:{}", this.serverSocket);
            AcceptHandler acceptHandler = new AcceptHandler(this.maxLineLength);
            this.acceptThreadShouldStop.set(false);
            acceptHandler.counterGroup = this.counterGroup;
            acceptHandler.handlerService = this.handlerService;
            acceptHandler.shouldStop = this.acceptThreadShouldStop;
            acceptHandler.ackEveryEvent = this.ackEveryEvent;
            acceptHandler.source = this;
            acceptHandler.serverSocket = this.serverSocket;
            acceptHandler.sourceEncoding = this.sourceEncoding;
            this.acceptThread = new Thread(acceptHandler);
            this.acceptThread.start();
            logger.debug("Source started");
            super.start();
        } catch (IOException e) {
            this.counterGroup.incrementAndGet("open.errors");
            logger.error("Unable to bind to socket. Exception follows.", e);
            throw new FlumeException(e);
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Source stopping");
        this.acceptThreadShouldStop.set(true);
        if (this.acceptThread != null) {
            logger.debug("Stopping accept handler thread");
            while (this.acceptThread.isAlive()) {
                try {
                    logger.debug("Waiting for accept handler to finish");
                    this.acceptThread.interrupt();
                    this.acceptThread.join(500L);
                } catch (InterruptedException e) {
                    logger.debug("Interrupted while waiting for accept handler to finish");
                    Thread.currentThread().interrupt();
                }
            }
            logger.debug("Stopped accept handler thread");
        }
        if (this.serverSocket != null) {
            try {
                this.serverSocket.close();
            } catch (IOException e2) {
                logger.error("Unable to close socket. Exception follows.", e2);
                return;
            }
        }
        if (this.handlerService != null) {
            this.handlerService.shutdown();
            logger.debug("Waiting for handler service to stop");
            try {
                this.handlerService.awaitTermination(500L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e3) {
                logger.debug("Interrupted while waiting for netcat handler service to stop");
                Thread.currentThread().interrupt();
            }
            if (!this.handlerService.isShutdown()) {
                this.handlerService.shutdownNow();
            }
            logger.debug("Handler service stopped");
        }
        logger.debug("Source stopped. Event metrics:{}", this.counterGroup);
        super.stop();
    }
}
