/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.io.decode;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.Callable;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hive.common.util.FixedSizedObjectPool;

public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
implements Consumer<BatchType>,
ReadPipeline {
    private volatile boolean isStopped = false;
    private final HashMap<BatchKey, BatchType> pendingData = new HashMap();
    private ConsumerFeedback<BatchType> upstreamFeedback;
    private final Consumer<ColumnVectorBatch> downstreamConsumer;
    private Callable<Void> readCallable;
    private final LlapDaemonQueueMetrics queueMetrics;
    private static final int CVB_POOL_SIZE = 128;
    protected final FixedSizedObjectPool<ColumnVectorBatch> cvbPool;

    public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, final int colCount, LlapDaemonQueueMetrics queueMetrics) {
        this.downstreamConsumer = consumer;
        this.queueMetrics = queueMetrics;
        this.cvbPool = new FixedSizedObjectPool(128, (Pool.PoolObjectHelper)new Pool.PoolObjectHelper<ColumnVectorBatch>(){

            public ColumnVectorBatch create() {
                return new ColumnVectorBatch(colCount);
            }

            public void resetBeforeOffer(ColumnVectorBatch t) {
            }
        });
    }

    public void init(ConsumerFeedback<BatchType> upstreamFeedback, Callable<Void> readCallable) {
        this.upstreamFeedback = upstreamFeedback;
        this.readCallable = readCallable;
    }

    @Override
    public Callable<Void> getReadCallable() {
        return this.readCallable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void consumeData(BatchType data) {
        Object targetBatch = null;
        boolean localIsStopped = false;
        Integer targetBatchVersion = null;
        Object object = this.pendingData;
        synchronized (object) {
            localIsStopped = this.isStopped;
            if (!localIsStopped) {
                targetBatch = (EncodedColumnBatch)this.pendingData.get(data.getBatchKey());
                if (targetBatch == null) {
                    targetBatch = data;
                    this.pendingData.put(data.getBatchKey(), data);
                }
                targetBatchVersion = targetBatch.version;
            }
            this.queueMetrics.setQueueSize(this.pendingData.size());
        }
        if (localIsStopped) {
            this.returnSourceData(data);
            return;
        }
        assert (targetBatchVersion != null);
        object = targetBatch;
        synchronized (object) {
            if (targetBatch != data) {
                throw new UnsupportedOperationException("Merging is not supported");
            }
            HashMap<BatchKey, BatchType> hashMap = this.pendingData;
            synchronized (hashMap) {
                targetBatch = this.isStopped ? null : (EncodedColumnBatch)this.pendingData.remove(data.getBatchKey());
                localIsStopped = targetBatchVersion != targetBatch.version;
            }
        }
        if (localIsStopped && targetBatch != data) {
            this.returnSourceData(data);
            return;
        }
        long start = System.currentTimeMillis();
        this.decodeBatch(targetBatch, this.downstreamConsumer);
        long end = System.currentTimeMillis();
        this.queueMetrics.addProcessingTime(end - start);
        this.returnSourceData(targetBatch);
    }

    private void returnSourceData(BatchType data) {
        ++((EncodedColumnBatch)data).version;
        this.upstreamFeedback.returnData(data);
    }

    protected abstract void decodeBatch(BatchType var1, Consumer<ColumnVectorBatch> var2);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setDone() {
        HashMap<BatchKey, BatchType> hashMap = this.pendingData;
        synchronized (hashMap) {
            if (!this.pendingData.isEmpty()) {
                throw new AssertionError((Object)("Not all data has been sent downstream: " + this.pendingData.size()));
            }
        }
        this.downstreamConsumer.setDone();
    }

    public void setError(Throwable t) {
        this.downstreamConsumer.setError(t);
        this.dicardPendingData(false);
    }

    @Override
    public void returnData(ColumnVectorBatch data) {
        this.cvbPool.offer((Object)data);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dicardPendingData(boolean isStopped) {
        ArrayList<EncodedColumnBatch> batches = new ArrayList<EncodedColumnBatch>(this.pendingData.size());
        HashMap<BatchKey, BatchType> hashMap = this.pendingData;
        synchronized (hashMap) {
            if (isStopped) {
                this.isStopped = true;
            }
            for (EncodedColumnBatch ecb : this.pendingData.values()) {
                ++ecb.version;
                batches.add(ecb);
            }
            this.pendingData.clear();
        }
        for (EncodedColumnBatch batch : batches) {
            this.upstreamFeedback.returnData(batch);
        }
    }

    @Override
    public void stop() {
        this.upstreamFeedback.stop();
        this.dicardPendingData(true);
    }

    @Override
    public void pause() {
        this.upstreamFeedback.pause();
    }

    @Override
    public void unpause() {
        this.upstreamFeedback.unpause();
    }
}

