/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.function.udaf.topk;

import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.util.ArrayUtil;
import io.confluent.ksql.util.KsqlException;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.kstream.Merger;

public class TopkKudaf<T extends Comparable<? super T>>
extends KsqlAggregateFunction<T, T[]> {
    private final int topKSize;
    private final Class<T> clazz;
    private final Schema returnType;
    private final List<Schema> argumentTypes;
    private final Comparator<T> comparator;

    TopkKudaf(int argIndexInValue, int topKSize, Schema returnType, List<Schema> argumentTypes, Class<T> clazz) {
        super(argIndexInValue, () -> (Comparable[])Array.newInstance(clazz, topKSize), returnType, argumentTypes);
        this.topKSize = topKSize;
        this.returnType = returnType;
        this.argumentTypes = argumentTypes;
        this.clazz = clazz;
        this.comparator = (v1, v2) -> {
            if (v1 == null && v2 == null) {
                return 0;
            }
            if (v1 == null) {
                return 1;
            }
            if (v2 == null) {
                return -1;
            }
            return Comparator.reverseOrder().compare((Comparable)v1, (Comparable)v2);
        };
    }

    @Override
    public T[] aggregate(T currentVal, T[] currentAggVal) {
        if (currentVal == null) {
            return currentAggVal;
        }
        int nullIndex = ArrayUtil.getNullIndex(currentAggVal);
        if (nullIndex != -1) {
            currentAggVal[nullIndex] = currentVal;
            Arrays.sort(currentAggVal, this.comparator);
            return currentAggVal;
        }
        T last = currentAggVal[currentAggVal.length - 1];
        if (currentVal.compareTo(last) <= 0) {
            return currentAggVal;
        }
        currentAggVal[currentAggVal.length - 1] = currentVal;
        Arrays.sort(currentAggVal, this.comparator);
        return currentAggVal;
    }

    @Override
    public Merger<String, T[]> getMerger() {
        return (aggKey, aggOne, aggTwo) -> {
            int nullId1 = ArrayUtil.getNullIndex(aggOne) == -1 ? this.topKSize : ArrayUtil.getNullIndex(aggOne);
            int nullId2 = ArrayUtil.getNullIndex(aggTwo) == -1 ? this.topKSize : ArrayUtil.getNullIndex(aggTwo);
            Comparable[] tempMergeTopKArray = (Comparable[])Array.newInstance(this.clazz, nullId1 + nullId2);
            System.arraycopy(aggOne, 0, tempMergeTopKArray, 0, nullId1);
            System.arraycopy(aggTwo, 0, tempMergeTopKArray, nullId1, nullId2);
            Arrays.sort(tempMergeTopKArray, this.comparator);
            if (tempMergeTopKArray.length < this.topKSize) {
                tempMergeTopKArray = ArrayUtil.padWithNull(this.clazz, tempMergeTopKArray, this.topKSize);
                return tempMergeTopKArray;
            }
            return Arrays.copyOf(tempMergeTopKArray, this.topKSize);
        };
    }

    @Override
    public KsqlAggregateFunction<T, T[]> getInstance(Map<String, Integer> expressionNames, List<Expression> functionArguments) {
        if (functionArguments.size() != 2) {
            throw new KsqlException(String.format("Invalid parameter count. Need 2 args, got %d arg(s)", functionArguments.size()));
        }
        int udafIndex = expressionNames.get(functionArguments.get(0).toString());
        int topKSize = Integer.parseInt(functionArguments.get(1).toString());
        return new TopkKudaf<T>(udafIndex, topKSize, this.returnType, this.argumentTypes, this.clazz);
    }
}

