/*
 * Decompiled with CFR 0.152.
 */
package com.nvidia.spark.rapids.iceberg.spark.source;

import ai.rapids.cudf.ColumnView;
import ai.rapids.cudf.Scalar;
import com.nvidia.spark.rapids.GpuCast;
import com.nvidia.spark.rapids.GpuColumnVector;
import com.nvidia.spark.rapids.GpuScalar;
import com.nvidia.spark.rapids.iceberg.data.GpuDeleteFilter;
import com.nvidia.spark.rapids.iceberg.spark.SparkSchemaUtil;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

public class GpuIcebergReader
implements CloseableIterator<ColumnarBatch> {
    private final Schema expectedSchema;
    private final PartitionReader<ColumnarBatch> partReader;
    private final GpuDeleteFilter deleteFilter;
    private final Map<Integer, ?> idToConstant;
    private boolean needNext = true;
    private boolean isBatchPending;

    public GpuIcebergReader(Schema expectedSchema, PartitionReader<ColumnarBatch> partReader, GpuDeleteFilter deleteFilter, Map<Integer, ?> idToConstant) {
        this.expectedSchema = expectedSchema;
        this.partReader = partReader;
        this.deleteFilter = deleteFilter;
        this.idToConstant = idToConstant;
    }

    public void close() throws IOException {
        this.partReader.close();
    }

    public boolean hasNext() {
        if (this.needNext) {
            try {
                this.isBatchPending = this.partReader.next();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
            this.needNext = false;
        }
        return this.isBatchPending;
    }

    public ColumnarBatch next() {
        if (!this.hasNext()) {
            throw new NoSuchElementException("No more batches to iterate");
        }
        this.isBatchPending = false;
        this.needNext = true;
        try (ColumnarBatch batch = (ColumnarBatch)this.partReader.get();){
            if (this.deleteFilter != null) {
                throw new UnsupportedOperationException("Delete filter is not supported");
            }
            ColumnarBatch updatedBatch = this.addConstantColumns(batch);
            ColumnarBatch columnarBatch = this.addUpcastsIfNeeded(updatedBatch);
            return columnarBatch;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ColumnarBatch addConstantColumns(ColumnarBatch batch) {
        ColumnVector[] columns = new ColumnVector[this.expectedSchema.columns().size()];
        ColumnarBatch result = null;
        ConstantDetector constantDetector = new ConstantDetector(this.idToConstant);
        try {
            int inputIdx = 0;
            int outputIdx = 0;
            for (Types.NestedField field : this.expectedSchema.columns()) {
                if (this.idToConstant.containsKey(field.fieldId())) {
                    DataType type = SparkSchemaUtil.convert(field.type());
                    Scalar scalar = GpuScalar.from(this.idToConstant.get(field.fieldId()), type);
                    try {
                        columns[outputIdx++] = GpuColumnVector.from(scalar, batch.numRows(), type);
                        continue;
                    }
                    finally {
                        if (scalar != null) {
                            scalar.close();
                        }
                        continue;
                    }
                }
                if (((Boolean)TypeUtil.visit((Type)field.type(), (TypeUtil.SchemaVisitor)constantDetector)).booleanValue()) {
                    throw new UnsupportedOperationException("constants not implemented for nested field");
                }
                GpuColumnVector gpuColumn = (GpuColumnVector)batch.column(inputIdx++);
                columns[outputIdx++] = gpuColumn.incRefCount();
            }
            if (inputIdx != batch.numCols()) {
                throw new IllegalStateException("Did not consume all input batch columns");
            }
            result = new ColumnarBatch(columns, batch.numRows());
        }
        finally {
            if (result == null) {
                for (ColumnVector c : columns) {
                    if (c == null) continue;
                    c.close();
                }
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ColumnarBatch addUpcastsIfNeeded(ColumnarBatch batch) {
        ColumnVector[] columns = null;
        try {
            List expectedColumnTypes = this.expectedSchema.columns();
            Preconditions.checkState((expectedColumnTypes.size() == batch.numCols() ? 1 : 0) != 0, (Object)("Expected to load " + expectedColumnTypes.size() + " columns, found " + batch.numCols()));
            columns = GpuColumnVector.extractColumns(batch);
            for (int i = 0; i < batch.numCols(); ++i) {
                DataType expectedSparkType = SparkSchemaUtil.convert(((Types.NestedField)expectedColumnTypes.get(i)).type());
                ColumnVector oldColumn = columns[i];
                columns[i] = GpuColumnVector.from(GpuCast.doCast((ColumnView)oldColumn.getBase(), oldColumn.dataType(), expectedSparkType, false, false, false), expectedSparkType);
            }
            ColumnarBatch newBatch = new ColumnarBatch(columns, batch.numRows());
            columns = null;
            ColumnarBatch columnarBatch = newBatch;
            return columnarBatch;
        }
        finally {
            batch.close();
            if (columns != null) {
                for (ColumnVector c : columns) {
                    c.close();
                }
            }
        }
    }

    private static class ConstantDetector
    extends TypeUtil.SchemaVisitor<Boolean> {
        private final Map<Integer, ?> idToConstant;

        ConstantDetector(Map<Integer, ?> idToConstant) {
            this.idToConstant = idToConstant;
        }

        public Boolean schema(Schema schema, Boolean structResult) {
            return structResult;
        }

        public Boolean struct(Types.StructType struct, List<Boolean> fieldResults) {
            return fieldResults.stream().anyMatch(b -> b);
        }

        public Boolean field(Types.NestedField field, Boolean fieldResult) {
            return this.idToConstant.containsKey(field.fieldId());
        }

        public Boolean list(Types.ListType list, Boolean elementResult) {
            return list.fields().stream().anyMatch(f -> this.idToConstant.containsKey(f.fieldId()));
        }

        public Boolean map(Types.MapType map, Boolean keyResult, Boolean valueResult) {
            return map.fields().stream().anyMatch(f -> this.idToConstant.containsKey(f.fieldId()));
        }

        public Boolean primitive(Type.PrimitiveType primitive) {
            return false;
        }
    }
}

