package org.apache.drill.exec.work.filter;

import io.netty.buffer.DrillBuf;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.Consumer;
import org.apache.drill.exec.ops.SendingAccountor;
import org.apache.drill.exec.ops.StatusHandler;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/filter/RuntimeFilterSink.class */
public class RuntimeFilterSink implements Closeable {
    private Map<Integer, Integer> joinMjId2rfNumber;
    private DrillbitContext drillbitContext;
    private SendingAccountor sendingAccountor;
    private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
    private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue();
    private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap();
    private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap();
    private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new HashMap();
    private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap();
    private AtomicBoolean running = new AtomicBoolean(true);
    private AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/filter/RuntimeFilterSink$AsyncAggregateWorker.class */
    public class AsyncAggregateWorker implements Runnable {
        private AtomicBoolean over;

        private AsyncAggregateWorker() {
            this.over = new AtomicBoolean(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            RuntimeFilterWritable runtimeFilterWritable;
            while (true) {
                if ((RuntimeFilterSink.this.joinMjId2rfNumber == null || !RuntimeFilterSink.this.joinMjId2rfNumber.isEmpty()) && RuntimeFilterSink.this.running.get()) {
                    synchronized (RuntimeFilterSink.this.rfQueue) {
                        try {
                            runtimeFilterWritable = (RuntimeFilterWritable) RuntimeFilterSink.this.rfQueue.poll();
                            while (runtimeFilterWritable == null && RuntimeFilterSink.this.running.get()) {
                                RuntimeFilterSink.this.rfQueue.wait();
                                runtimeFilterWritable = (RuntimeFilterWritable) RuntimeFilterSink.this.rfQueue.poll();
                            }
                        } catch (InterruptedException e) {
                            RuntimeFilterSink.logger.error("RFW_Aggregator thread being interrupted", e);
                        }
                    }
                    if (runtimeFilterWritable == null) {
                        continue;
                    } else {
                        try {
                            try {
                                RuntimeFilterSink.this.aggregate(runtimeFilterWritable);
                                if (runtimeFilterWritable != null) {
                                    runtimeFilterWritable.close();
                                }
                            } catch (Exception e2) {
                                RuntimeFilterSink.logger.error("Failed to aggregate or route the RFW", e2);
                                throw new DrillRuntimeException(e2);
                            }
                        } catch (Throwable th) {
                            if (runtimeFilterWritable != null) {
                                runtimeFilterWritable.close();
                            }
                            throw th;
                        }
                    }
                }
            }
            if (!RuntimeFilterSink.this.running.get()) {
                while (true) {
                    RuntimeFilterWritable runtimeFilterWritable2 = (RuntimeFilterWritable) RuntimeFilterSink.this.rfQueue.poll();
                    if (runtimeFilterWritable2 == null) {
                        break;
                    } else {
                        runtimeFilterWritable2.close();
                    }
                }
            }
            this.over.set(true);
        }
    }

    public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) {
        this.drillbitContext = drillbitContext;
        this.sendingAccountor = sendingAccountor;
        drillbitContext.getExecutor().submit(this.asyncAggregateWorker);
    }

    public void add(RuntimeFilterWritable runtimeFilterWritable) {
        if (!this.running.get()) {
            runtimeFilterWritable.close();
            return;
        }
        runtimeFilterWritable.retainBuffers(1);
        int majorFragmentId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
        if (this.joinMjId2Stopwatch.get(Integer.valueOf(majorFragmentId)) == null) {
            this.joinMjId2Stopwatch.put(Integer.valueOf(majorFragmentId), Stopwatch.createStarted());
        }
        synchronized (this.rfQueue) {
            this.rfQueue.add(runtimeFilterWritable);
            this.rfQueue.notify();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.running.set(false);
        if (this.asyncAggregateWorker != null) {
            synchronized (this.rfQueue) {
                this.rfQueue.notify();
            }
        }
        while (!this.asyncAggregateWorker.over.get()) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                logger.error("interrupted while sleeping to wait for the aggregating worker thread to exit", e);
            }
        }
        Iterator<RuntimeFilterWritable> it = this.joinMjId2AggregatedRF.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
        int majorFragmentId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
        int intValue = this.joinMjId2rfNumber.get(Integer.valueOf(majorFragmentId)).intValue() - 1;
        this.joinMjId2rfNumber.put(Integer.valueOf(majorFragmentId), Integer.valueOf(intValue));
        RuntimeFilterWritable runtimeFilterWritable2 = this.joinMjId2AggregatedRF.get(Integer.valueOf(majorFragmentId));
        if (runtimeFilterWritable2 == null) {
            runtimeFilterWritable2 = runtimeFilterWritable;
            runtimeFilterWritable2.retainBuffers(1);
        } else {
            runtimeFilterWritable2.aggregate(runtimeFilterWritable);
        }
        this.joinMjId2AggregatedRF.put(Integer.valueOf(majorFragmentId), runtimeFilterWritable2);
        if (intValue == 0) {
            this.joinMjId2AggregatedRF.remove(Integer.valueOf(majorFragmentId));
            route(runtimeFilterWritable2);
            this.joinMjId2rfNumber.remove(Integer.valueOf(majorFragmentId));
            logger.info("received all the RFWs belonging to the majorId {}'s HashJoin nodes and flushed aggregated RFW out elapsed {} ms", Integer.valueOf(majorFragmentId), Long.valueOf(this.joinMjId2Stopwatch.get(Integer.valueOf(majorFragmentId)).elapsed(TimeUnit.MILLISECONDS)));
        }
    }

    private void route(RuntimeFilterWritable runtimeFilterWritable) {
        BitData.RuntimeFilterBDef runtimeFilterBDef = runtimeFilterWritable.getRuntimeFilterBDef();
        int majorFragmentId = runtimeFilterBDef.getMajorFragmentId();
        UserBitShared.QueryId queryId = runtimeFilterBDef.getQueryId();
        List probeFieldsList = runtimeFilterBDef.getProbeFieldsList();
        List bloomFilterSizeInBytesList = runtimeFilterBDef.getBloomFilterSizeInBytesList();
        long rfIdentifier = runtimeFilterBDef.getRfIdentifier();
        DrillBuf[] data = runtimeFilterWritable.getData();
        List<CoordinationProtos.DrillbitEndpoint> list = this.joinMjId2probeScanEps.get(Integer.valueOf(majorFragmentId));
        runtimeFilterWritable.retainBuffers(list.size() - 1);
        int intValue = this.joinMjId2ScanMjId.get(Integer.valueOf(majorFragmentId)).intValue();
        for (int i = 0; i < list.size(); i++) {
            BitData.RuntimeFilterBDef.Builder newBuilder = BitData.RuntimeFilterBDef.newBuilder();
            Iterator it = probeFieldsList.iterator();
            while (it.hasNext()) {
                newBuilder.addProbeFields((String) it.next());
            }
            new AccountingDataTunnel(this.drillbitContext.getDataConnectionsPool().getTunnel(list.get(i)), this.sendingAccountor, new StatusHandler(new Consumer<RpcException>() { // from class: org.apache.drill.exec.work.filter.RuntimeFilterSink.1
                @Override // org.apache.drill.exec.ops.Consumer
                public void accept(RpcException rpcException) {
                    RuntimeFilterSink.logger.warn("fail to broadcast a runtime filter to the probe side scan node", rpcException);
                }

                @Override // org.apache.drill.exec.ops.Consumer
                public void interrupt(InterruptedException interruptedException) {
                    RuntimeFilterSink.logger.warn("fail to broadcast a runtime filter to the probe side scan node", interruptedException);
                }
            }, this.sendingAccountor)).sendRuntimeFilter(new RuntimeFilterWritable(newBuilder.setQueryId(queryId).setMajorFragmentId(intValue).setMinorFragmentId(i).setToForeman(false).setRfIdentifier(rfIdentifier).addAllBloomFilterSizeInBytes(bloomFilterSizeInBytesList).build(), data));
        }
    }

    public void setJoinMjId2rfNumber(Map<Integer, Integer> map) {
        this.joinMjId2rfNumber = map;
    }

    public void setJoinMjId2probeScanEps(Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> map) {
        this.joinMjId2probeScanEps = map;
    }

    public void setJoinMjId2ScanMjId(Map<Integer, Integer> map) {
        this.joinMjId2ScanMjId = map;
    }
}
