package org.apache.drill.exec.store.pcap;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.store.pcap.decoder.Packet;
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
import org.apache.drill.exec.store.pcap.decoder.TcpSession;
import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
import org.apache.drill.exec.store.pcap.schema.Schema;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/store/pcap/PcapBatchReader.class */
public class PcapBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
    protected static final int BUFFER_SIZE = 500000;
    private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);
    private FileSplit split;
    private PacketDecoder decoder;
    private InputStream fsStream;
    private RowSetLoader rowWriter;
    private int validBytes;
    private byte[] buffer;
    private int offset;
    private ScalarWriter typeWriter;
    private ScalarWriter timestampWriter;
    private ScalarWriter timestampMicroWriter;
    private ScalarWriter networkWriter;
    private ScalarWriter srcMacAddressWriter;
    private ScalarWriter dstMacAddressWriter;
    private ScalarWriter dstIPWriter;
    private ScalarWriter srcIPWriter;
    private ScalarWriter srcPortWriter;
    private ScalarWriter dstPortWriter;
    private ScalarWriter packetLengthWriter;
    private ScalarWriter tcpSessionWriter;
    private ScalarWriter tcpSequenceWriter;
    private ScalarWriter tcpAckNumberWriter;
    private ScalarWriter tcpFlagsWriter;
    private ScalarWriter tcpParsedFlagsWriter;
    private ScalarWriter tcpNsWriter;
    private ScalarWriter tcpCwrWriter;
    private ScalarWriter tcpEceWriter;
    private ScalarWriter tcpFlagsEceEcnCapableWriter;
    private ScalarWriter tcpFlagsCongestionWriter;
    private ScalarWriter tcpUrgWriter;
    private ScalarWriter tcpAckWriter;
    private ScalarWriter tcpPshWriter;
    private ScalarWriter tcpRstWriter;
    private ScalarWriter tcpSynWriter;
    private ScalarWriter tcpFinWriter;
    private ScalarWriter dataWriter;
    private ScalarWriter isCorruptWriter;
    private final PcapFormatConfig readerConfig;
    private ScalarWriter sessionStartTimeWriter;
    private ScalarWriter sessionEndTimeWriter;
    private ScalarWriter sessionDurationWriter;
    private ScalarWriter connectionTimeWriter;
    private ScalarWriter packetCountWriter;
    private ScalarWriter originPacketCounterWriter;
    private ScalarWriter remotePacketCounterWriter;
    private ScalarWriter originDataVolumeWriter;
    private ScalarWriter remoteDataVolumeWriter;
    private ScalarWriter hostDataWriter;
    private ScalarWriter remoteDataWriter;
    private final int maxRecords;
    private Map<Long, TcpSession> sessionQueue;

    public PcapBatchReader(PcapFormatConfig pcapFormatConfig, int i) {
        this.readerConfig = pcapFormatConfig;
        if (pcapFormatConfig.getSessionizeTCPStreams()) {
            this.sessionQueue = new HashMap();
        }
        this.maxRecords = i;
    }

    public boolean open(FileScanFramework.FileSchemaNegotiator fileSchemaNegotiator) {
        this.split = fileSchemaNegotiator.split();
        openFile(fileSchemaNegotiator);
        fileSchemaNegotiator.tableSchema(new Schema(this.readerConfig.getSessionizeTCPStreams()).buildSchema(new SchemaBuilder()), false);
        this.rowWriter = fileSchemaNegotiator.build().writer();
        populateColumnWriters(this.rowWriter);
        return true;
    }

    public boolean next() {
        while (!this.rowWriter.isFull()) {
            if (!parseNextPacket(this.rowWriter)) {
                return false;
            }
        }
        return true;
    }

    public void close() {
        if (this.sessionQueue != null && !this.sessionQueue.isEmpty()) {
            logger.warn("Unclosed sessions remaining in PCAP");
        }
        try {
            this.fsStream.close();
            this.fsStream = null;
            this.buffer = null;
            this.decoder = null;
        } catch (IOException e) {
            throw UserException.dataReadError().addContext("Error closing InputStream: " + e.getMessage()).build(logger);
        }
    }

    private void openFile(FileScanFramework.FileSchemaNegotiator fileSchemaNegotiator) {
        try {
            this.fsStream = fileSchemaNegotiator.fileSystem().openPossiblyCompressedStream(this.split.getPath());
            this.decoder = new PacketDecoder(this.fsStream);
            this.buffer = new byte[BUFFER_SIZE + this.decoder.getMaxLength()];
            this.validBytes = this.fsStream.read(this.buffer);
        } catch (IOException e) {
            throw UserException.dataReadError(e).addContext("File name:", this.split.getPath().toString()).build(logger);
        }
    }

    private void populateColumnWriters(RowSetLoader rowSetLoader) {
        if (this.readerConfig.getSessionizeTCPStreams()) {
            this.srcMacAddressWriter = rowSetLoader.scalar("src_mac_address");
            this.dstMacAddressWriter = rowSetLoader.scalar("dst_mac_address");
            this.dstIPWriter = rowSetLoader.scalar("dst_ip");
            this.srcIPWriter = rowSetLoader.scalar("src_ip");
            this.srcPortWriter = rowSetLoader.scalar("src_port");
            this.dstPortWriter = rowSetLoader.scalar("dst_port");
            this.sessionStartTimeWriter = rowSetLoader.scalar("session_start_time");
            this.sessionEndTimeWriter = rowSetLoader.scalar("session_end_time");
            this.sessionDurationWriter = rowSetLoader.scalar("session_duration");
            this.connectionTimeWriter = rowSetLoader.scalar("connection_time");
            this.tcpSessionWriter = rowSetLoader.scalar("tcp_session");
            this.packetCountWriter = rowSetLoader.scalar("total_packet_count");
            this.hostDataWriter = rowSetLoader.scalar("data_from_originator");
            this.remoteDataWriter = rowSetLoader.scalar("data_from_remote");
            this.originPacketCounterWriter = rowSetLoader.scalar("packet_count_from_origin");
            this.remotePacketCounterWriter = rowSetLoader.scalar("packet_count_from_remote");
            this.originDataVolumeWriter = rowSetLoader.scalar("data_volume_from_origin");
            this.remoteDataVolumeWriter = rowSetLoader.scalar("data_volume_from_remote");
            this.isCorruptWriter = rowSetLoader.scalar("is_corrupt");
            return;
        }
        this.typeWriter = rowSetLoader.scalar("type");
        this.timestampWriter = rowSetLoader.scalar("packet_timestamp");
        this.timestampMicroWriter = rowSetLoader.scalar("timestamp_micro");
        this.networkWriter = rowSetLoader.scalar("network");
        this.srcMacAddressWriter = rowSetLoader.scalar("src_mac_address");
        this.dstMacAddressWriter = rowSetLoader.scalar("dst_mac_address");
        this.dstIPWriter = rowSetLoader.scalar("dst_ip");
        this.srcIPWriter = rowSetLoader.scalar("src_ip");
        this.srcPortWriter = rowSetLoader.scalar("src_port");
        this.dstPortWriter = rowSetLoader.scalar("dst_port");
        this.packetLengthWriter = rowSetLoader.scalar("packet_length");
        this.tcpSessionWriter = rowSetLoader.scalar("tcp_session");
        this.tcpSequenceWriter = rowSetLoader.scalar("tcp_sequence");
        this.tcpAckNumberWriter = rowSetLoader.scalar("tcp_ack");
        this.tcpFlagsWriter = rowSetLoader.scalar("tcp_flags");
        this.tcpParsedFlagsWriter = rowSetLoader.scalar("tcp_parsed_flags");
        this.tcpNsWriter = rowSetLoader.scalar("tcp_flags_ns");
        this.tcpCwrWriter = rowSetLoader.scalar("tcp_flags_cwr");
        this.tcpEceWriter = rowSetLoader.scalar("tcp_flags_ece");
        this.tcpFlagsEceEcnCapableWriter = rowSetLoader.scalar("tcp_flags_ece_ecn_capable");
        this.tcpFlagsCongestionWriter = rowSetLoader.scalar("tcp_flags_ece_congestion_experienced");
        this.tcpUrgWriter = rowSetLoader.scalar("tcp_flags_urg");
        this.tcpAckWriter = rowSetLoader.scalar("tcp_flags_ack");
        this.tcpPshWriter = rowSetLoader.scalar("tcp_flags_psh");
        this.tcpRstWriter = rowSetLoader.scalar("tcp_flags_rst");
        this.tcpSynWriter = rowSetLoader.scalar("tcp_flags_syn");
        this.tcpFinWriter = rowSetLoader.scalar("tcp_flags_fin");
        this.dataWriter = rowSetLoader.scalar("data");
        this.isCorruptWriter = rowSetLoader.scalar("is_corrupt");
    }

    private boolean parseNextPacket(RowSetLoader rowSetLoader) {
        if (rowSetLoader.limitReached(this.maxRecords)) {
            return false;
        }
        Packet packet = new Packet();
        if (this.offset >= this.validBytes) {
            return false;
        }
        if (this.validBytes - this.offset < this.decoder.getMaxLength()) {
            getNextPacket(rowSetLoader);
        }
        int i = this.offset;
        this.offset = this.decoder.decodePacket(this.buffer, this.offset, packet, this.decoder.getMaxLength(), this.validBytes);
        if (this.offset > this.validBytes) {
            packet.setIsCorrupt(true);
            logger.debug("Invalid packet at offset {}", Integer.valueOf(i));
        }
        if (!this.readerConfig.getSessionizeTCPStreams()) {
            addDataToTable(packet, this.decoder.getNetwork(), rowSetLoader);
            return true;
        }
        long sessionHash = packet.getSessionHash();
        if (!this.sessionQueue.containsKey(Long.valueOf(sessionHash))) {
            logger.debug("Adding session {} to session queue.", Long.valueOf(sessionHash));
            this.sessionQueue.put(Long.valueOf(sessionHash), new TcpSession(sessionHash));
        }
        this.sessionQueue.get(Long.valueOf(sessionHash)).addPacket(packet);
        if (!this.sessionQueue.get(Long.valueOf(sessionHash)).connectionClosed()) {
            return true;
        }
        addSessionDataToTable(this.sessionQueue.get(Long.valueOf(sessionHash)), rowSetLoader);
        this.sessionQueue.remove(Long.valueOf(sessionHash));
        return true;
    }

    private boolean getNextPacket(RowSetLoader rowSetLoader) {
        new Packet();
        try {
            if (this.validBytes == this.buffer.length) {
                System.arraycopy(this.buffer, this.offset, this.buffer, 0, this.validBytes - this.offset);
                this.validBytes -= this.offset;
                this.offset = 0;
                int read = this.fsStream.read(this.buffer, this.validBytes, this.buffer.length - this.validBytes);
                if (read > 0) {
                    this.validBytes += read;
                }
                logger.debug("read {} bytes, at {} offset", Integer.valueOf(read), Integer.valueOf(this.validBytes));
            } else {
                int read2 = this.fsStream.read(this.buffer, this.offset, this.buffer.length - this.offset);
                if (read2 > 0) {
                    this.validBytes += read2;
                    logger.debug("Topped up buffer with {} bytes to yield {}", Integer.valueOf(read2), Integer.valueOf(this.validBytes));
                }
            }
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    private void addSessionDataToTable(TcpSession tcpSession, RowSetLoader rowSetLoader) {
        rowSetLoader.start();
        this.sessionStartTimeWriter.setTimestamp(tcpSession.getSessionStartTime());
        this.sessionEndTimeWriter.setTimestamp(tcpSession.getSessionEndTime());
        this.sessionDurationWriter.setPeriod(tcpSession.getSessionDuration());
        this.connectionTimeWriter.setPeriod(tcpSession.getConnectionTime());
        this.srcMacAddressWriter.setString(tcpSession.getSrcMac());
        this.dstMacAddressWriter.setString(tcpSession.getDstMac());
        this.srcIPWriter.setString(tcpSession.getSrcIP());
        this.dstIPWriter.setString(tcpSession.getDstIP());
        this.srcPortWriter.setInt(tcpSession.getSrcPort());
        this.dstPortWriter.setInt(tcpSession.getDstPort());
        this.tcpSessionWriter.setLong(tcpSession.getSessionID());
        this.packetCountWriter.setInt(tcpSession.getPacketCount());
        this.originPacketCounterWriter.setInt(tcpSession.getPacketCountFromOrigin());
        this.remotePacketCounterWriter.setInt(tcpSession.getPacketCountFromRemote());
        this.originDataVolumeWriter.setInt(tcpSession.getDataFromOriginator().length);
        this.remoteDataVolumeWriter.setInt(tcpSession.getDataFromRemote().length);
        this.isCorruptWriter.setBoolean(tcpSession.hasCorruptedData());
        this.hostDataWriter.setString(tcpSession.getDataFromOriginatorAsString());
        this.remoteDataWriter.setString(tcpSession.getDataFromRemoteAsString());
        rowSetLoader.save();
    }

    private void addDataToTable(Packet packet, int i, RowSetLoader rowSetLoader) {
        rowSetLoader.start();
        this.typeWriter.setString(packet.getPacketType());
        this.timestampWriter.setTimestamp(Instant.ofEpochMilli(packet.getTimestamp()));
        this.timestampMicroWriter.setLong(packet.getTimestampMicro());
        this.networkWriter.setInt(i);
        this.srcMacAddressWriter.setString(packet.getEthernetSource());
        this.dstMacAddressWriter.setString(packet.getEthernetDestination());
        String destinationIpAddressString = packet.getDestinationIpAddressString();
        if (destinationIpAddressString == null) {
            this.dstIPWriter.setNull();
        } else {
            this.dstIPWriter.setString(destinationIpAddressString);
        }
        String sourceIpAddressString = packet.getSourceIpAddressString();
        if (sourceIpAddressString == null) {
            this.srcIPWriter.setNull();
        } else {
            this.srcIPWriter.setString(sourceIpAddressString);
        }
        this.srcPortWriter.setInt(packet.getSrc_port());
        this.dstPortWriter.setInt(packet.getDst_port());
        this.packetLengthWriter.setInt(packet.getPacketLength());
        this.tcpSessionWriter.setLong(packet.getSessionHash());
        this.tcpSequenceWriter.setInt(packet.getSequenceNumber());
        this.tcpAckNumberWriter.setInt(packet.getAckNumber());
        this.tcpFlagsWriter.setInt(packet.getFlags());
        this.tcpParsedFlagsWriter.setString(packet.getParsedFlags());
        this.tcpNsWriter.setBoolean((packet.getFlags() & 256) != 0);
        this.tcpCwrWriter.setBoolean((packet.getFlags() & 128) != 0);
        this.tcpEceWriter.setBoolean((packet.getFlags() & 64) != 0);
        this.tcpFlagsEceEcnCapableWriter.setBoolean((packet.getFlags() & 66) == 66);
        this.tcpFlagsCongestionWriter.setBoolean((packet.getFlags() & 66) == 64);
        this.tcpUrgWriter.setBoolean((packet.getFlags() & 32) != 0);
        this.tcpAckWriter.setBoolean((packet.getFlags() & 16) != 0);
        this.tcpPshWriter.setBoolean((packet.getFlags() & 8) != 0);
        this.tcpRstWriter.setBoolean((packet.getFlags() & 4) != 0);
        this.tcpSynWriter.setBoolean((packet.getFlags() & 2) != 0);
        this.tcpFinWriter.setBoolean((packet.getFlags() & 1) != 0);
        this.dataWriter.setString(PcapFormatUtils.parseBytesToASCII(packet.getData()));
        this.isCorruptWriter.setBoolean(packet.isCorrupt());
        rowSetLoader.save();
    }
}
