/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.AbstractListState;
import org.apache.flink.state.forst.ContextKey;
import org.apache.flink.state.forst.ForStDBGetRequest;
import org.apache.flink.state.forst.ForStDBListGetRequest;
import org.apache.flink.state.forst.ForStDBMultiRawMergePutRequest;
import org.apache.flink.state.forst.ForStDBPutRequest;
import org.apache.flink.state.forst.ForStDBRawGetRequest;
import org.apache.flink.state.forst.ForStInnerTable;
import org.apache.flink.state.forst.ForStSerializerUtils;
import org.apache.flink.state.forst.ForStStateRequestType;
import org.apache.flink.state.forst.ListDelimitedSerializer;
import org.forstdb.ColumnFamilyHandle;

public class ForStListState<K, N, V>
extends AbstractListState<K, N, V>
implements ListState<V>,
ForStInnerTable<K, N, List<V>> {
    private final ColumnFamilyHandle columnFamilyHandle;
    private final ThreadLocal<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder;
    private final N defaultNamespace;
    private final ThreadLocal<TypeSerializer<N>> namespaceSerializer;
    private final ThreadLocal<DataOutputSerializer> valueSerializerView;
    private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
    private final boolean enableKeyReuse;

    public ForStListState(StateRequestHandler stateRequestHandler, ColumnFamilyHandle columnFamily, ListStateDescriptor<V> listStateDescriptor, Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilderInitializer, N defaultNamespace, Supplier<TypeSerializer<N>> namespaceSerializerInitializer, Supplier<DataOutputSerializer> valueSerializerViewInitializer, Supplier<DataInputDeserializer> valueDeserializerViewInitializer) {
        super(stateRequestHandler, listStateDescriptor);
        this.columnFamilyHandle = columnFamily;
        this.serializedKeyBuilder = ThreadLocal.withInitial(serializedKeyBuilderInitializer);
        this.defaultNamespace = defaultNamespace;
        this.namespaceSerializer = ThreadLocal.withInitial(namespaceSerializerInitializer);
        this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer);
        this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer);
        this.enableKeyReuse = defaultNamespace instanceof VoidNamespace && namespaceSerializerInitializer.get() instanceof VoidNamespaceSerializer;
    }

    @Override
    public ColumnFamilyHandle getColumnFamilyHandle() {
        return this.columnFamilyHandle;
    }

    @Override
    public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException {
        return ForStSerializerUtils.serializeKeyAndNamespace(contextKey, this.serializedKeyBuilder.get(), this.defaultNamespace, this.namespaceSerializer.get(), this.enableKeyReuse);
    }

    @Override
    public byte[] serializeValue(List<V> valueList) throws IOException {
        DataOutputSerializer outputView = this.valueSerializerView.get();
        outputView.clear();
        return ListDelimitedSerializer.serializeList(valueList, this.getValueSerializer(), outputView);
    }

    @Override
    public List<V> deserializeValue(byte[] valueBytes) throws IOException {
        DataInputDeserializer inputView = this.valueDeserializerView.get();
        inputView.setBuffer(valueBytes);
        return ListDelimitedSerializer.deserializeList(valueBytes, this.getValueSerializer(), inputView);
    }

    @Override
    public ForStDBGetRequest<K, N, List<V>, ?> buildDBGetRequest(StateRequest<?, ?, ?, ?> stateRequest) {
        ContextKey contextKey = new ContextKey(stateRequest.getRecordContext(), stateRequest.getNamespace());
        switch (stateRequest.getRequestType()) {
            case LIST_GET: {
                return new ForStDBListGetRequest(contextKey, this, stateRequest.getFuture());
            }
            case CUSTOMIZED: {
                return new ForStDBRawGetRequest(contextKey, this, (InternalStateFuture<byte[]>)stateRequest.getFuture());
            }
        }
        throw new UnsupportedOperationException();
    }

    @Override
    public ForStDBPutRequest<K, N, List<V>> buildDBPutRequest(StateRequest<?, ?, ?, ?> stateRequest) {
        List value;
        ContextKey contextKey = new ContextKey(stateRequest.getRecordContext(), stateRequest.getNamespace());
        boolean merge = false;
        switch (stateRequest.getRequestType()) {
            case CLEAR: {
                value = null;
                break;
            }
            case LIST_UPDATE: {
                value = (List)stateRequest.getPayload();
                break;
            }
            case LIST_ADD: {
                value = Collections.singletonList(stateRequest.getPayload());
                merge = true;
                break;
            }
            case LIST_ADD_ALL: {
                value = (List)stateRequest.getPayload();
                merge = true;
                break;
            }
            case CUSTOMIZED: {
                return new ForStDBMultiRawMergePutRequest(contextKey, (Collection)((Tuple2)stateRequest.getPayload()).f1, this, (InternalStateFuture<Void>)stateRequest.getFuture());
            }
            default: {
                throw new IllegalArgumentException();
            }
        }
        if (merge) {
            return ForStDBPutRequest.ofMerge(contextKey, value, this, (InternalStateFuture<Void>)stateRequest.getFuture());
        }
        return ForStDBPutRequest.of(contextKey, value, this, (InternalStateFuture<Void>)stateRequest.getFuture());
    }

    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        ArrayList<StateFuture> futures = new ArrayList<StateFuture>(sources.size());
        for (N source : sources) {
            if (source == null) continue;
            this.setCurrentNamespace(source);
            futures.add(this.handleRequest(StateRequestType.CUSTOMIZED, Tuple2.of((Object)((Object)ForStStateRequestType.LIST_GET_RAW), null)));
        }
        return StateFutureUtils.combineAll(futures).thenCompose(values -> {
            ArrayList<StateFuture> updateFutures = new ArrayList<StateFuture>(sources.size() + 1);
            ArrayList<byte[]> validValues = new ArrayList<byte[]>(sources.size());
            Iterator valueIterator = values.iterator();
            for (Object source : sources) {
                byte[] value = (byte[])valueIterator.next();
                if (value == null) continue;
                validValues.add(value);
                this.setCurrentNamespace(source);
                updateFutures.add(this.asyncClear());
            }
            if (!validValues.isEmpty()) {
                this.setCurrentNamespace(target);
                updateFutures.add(this.handleRequest(StateRequestType.CUSTOMIZED, Tuple2.of((Object)((Object)ForStStateRequestType.MERGE_ALL_RAW), validValues)));
            }
            return StateFutureUtils.combineAll(updateFutures);
        }).thenAccept(ignores -> {});
    }

    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        try {
            ArrayList<byte[]> validValues = new ArrayList<byte[]>(sources.size());
            for (N source : sources) {
                if (source == null) continue;
                this.setCurrentNamespace(source);
                byte[] oldValue = (byte[])this.handleRequestSync(StateRequestType.CUSTOMIZED, Tuple2.of((Object)((Object)ForStStateRequestType.LIST_GET_RAW), null));
                if (oldValue == null) continue;
                this.setCurrentNamespace(source);
                this.clear();
                validValues.add(oldValue);
            }
            if (!validValues.isEmpty()) {
                this.setCurrentNamespace(target);
                this.handleRequestSync(StateRequestType.CUSTOMIZED, Tuple2.of((Object)((Object)ForStStateRequestType.MERGE_ALL_RAW), validValues));
            }
        }
        catch (Exception e) {
            throw new RuntimeException("merge namespace fail.", e);
        }
    }
}

