/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.daemon.impl;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMReporter
extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class);
    private LlapNodeId nodeId;
    private final QueryFailedHandler queryFailedHandler;
    private final Configuration conf;
    private final ListeningExecutorService queueLookupExecutor;
    private final ListeningExecutorService executor;
    private final RetryPolicy retryPolicy;
    private final long retryTimeout;
    private final SocketFactory socketFactory;
    private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new DelayQueue();
    private final AtomicReference<InetSocketAddress> localAddress;
    private final long heartbeatInterval;
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final Map<QueryIdentifier, Map<LlapNodeId, AMNodeInfo>> knownAppMasters = new HashMap<QueryIdentifier, Map<LlapNodeId, AMNodeInfo>>();
    volatile ListenableFuture<Void> queueLookupFuture;
    private final DaemonId daemonId;

    public AMReporter(int numExecutors, int maxThreads, AtomicReference<InetSocketAddress> localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId, SocketFactory socketFactory) {
        super(AMReporter.class.getName());
        this.localAddress = localAddress;
        this.queryFailedHandler = queryFailedHandler;
        this.conf = conf;
        this.daemonId = daemonId;
        if (maxThreads < numExecutors) {
            LOG.warn("maxThreads={} is less than numExecutors={}. Setting maxThreads=numExecutors", (Object)maxThreads, (Object)numExecutors);
            maxThreads = numExecutors;
        }
        ThreadPoolExecutor rawExecutor = new ThreadPoolExecutor(numExecutors, maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build());
        this.executor = MoreExecutors.listeningDecorator((ExecutorService)rawExecutor);
        ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build());
        this.queueLookupExecutor = MoreExecutors.listeningDecorator((ExecutorService)rawExecutor2);
        this.heartbeatInterval = HiveConf.getTimeVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS, (TimeUnit)TimeUnit.MILLISECONDS);
        this.retryTimeout = HiveConf.getTimeVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, (TimeUnit)TimeUnit.MILLISECONDS);
        long retrySleep = HiveConf.getTimeVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, (TimeUnit)TimeUnit.MILLISECONDS);
        this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep((long)this.retryTimeout, (long)retrySleep, (TimeUnit)TimeUnit.MILLISECONDS);
        this.socketFactory = socketFactory;
        LOG.info("Setting up AMReporter with heartbeatInterval(ms)=" + this.heartbeatInterval + ", retryTime(ms)=" + this.retryTimeout + ", retrySleep(ms)=" + retrySleep);
    }

    public void serviceStart() {
        QueueLookupCallable queueDrainerCallable = new QueueLookupCallable();
        this.queueLookupFuture = this.queueLookupExecutor.submit((Callable)((Object)queueDrainerCallable));
        Futures.addCallback(this.queueLookupFuture, (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(Void result) {
                LOG.info("AMReporter QueueDrainer exited");
            }

            public void onFailure(Throwable t) {
                if (t instanceof CancellationException && AMReporter.this.isShutdown.get()) {
                    LOG.info("AMReporter QueueDrainer exited as a result of a cancellation after shutdown");
                } else {
                    LOG.error("AMReporter QueueDrainer exited with error", t);
                    Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
                }
            }
        }, (Executor)MoreExecutors.directExecutor());
        this.nodeId = LlapNodeId.getInstance((String)this.localAddress.get().getHostName(), (int)this.localAddress.get().getPort());
        LOG.info("AMReporter running with DaemonId: {}, NodeId: {}", (Object)this.daemonId, (Object)this.nodeId);
    }

    public void serviceStop() {
        if (!this.isShutdown.getAndSet(true)) {
            if (this.queueLookupFuture != null) {
                this.queueLookupFuture.cancel(true);
            }
            this.queueLookupExecutor.shutdownNow();
            this.executor.shutdownNow();
            LOG.info("Stopped service: " + this.getName());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AMNodeInfo registerTask(String amLocation, int port, String umbilicalUser, Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID attemptId, boolean isGuaranteed) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Registering for heartbeat: {}, queryIdentifier={}, attemptId={}", new Object[]{amLocation + ":" + port, queryIdentifier, attemptId});
        }
        Map<QueryIdentifier, Map<LlapNodeId, AMNodeInfo>> map = this.knownAppMasters;
        synchronized (map) {
            AMNodeInfo amNodeInfo;
            LlapNodeId amNodeId = LlapNodeId.getInstance((String)amLocation, (int)port);
            Map<LlapNodeId, AMNodeInfo> amNodeInfoPerQuery = this.knownAppMasters.get(queryIdentifier);
            if (amNodeInfoPerQuery == null) {
                amNodeInfoPerQuery = new HashMap<LlapNodeId, AMNodeInfo>();
                this.knownAppMasters.put(queryIdentifier, amNodeInfoPerQuery);
            }
            if ((amNodeInfo = amNodeInfoPerQuery.get(amNodeId)) == null) {
                amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, this.retryPolicy, this.retryTimeout, this.socketFactory, this.conf);
                amNodeInfoPerQuery.put(amNodeId, amNodeInfo);
                amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + this.heartbeatInterval);
                this.pendingHeartbeatQueeu.add(amNodeInfo);
            }
            amNodeInfo.addTaskAttempt(attemptId, isGuaranteed);
            return amNodeInfo;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterTask(String amLocation, int port, QueryIdentifier queryIdentifier, TezTaskAttemptID ta) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Un-registering for heartbeat: {}, attempt={}", (Object)(amLocation + ":" + port), (Object)ta);
        }
        Map<QueryIdentifier, Map<LlapNodeId, AMNodeInfo>> map = this.knownAppMasters;
        synchronized (map) {
            AMNodeInfo amNodeInfo = this.getAMNodeInfo(amLocation, port, queryIdentifier);
            if (amNodeInfo == null) {
                LOG.info("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" + port);
            } else {
                amNodeInfo.removeTaskAttempt(ta);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void taskKilled(String amLocation, int port, String umbilicalUser, Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) {
        AMNodeInfo amNodeInfo;
        LlapNodeId amNodeId = LlapNodeId.getInstance((String)amLocation, (int)port);
        Map<QueryIdentifier, Map<LlapNodeId, AMNodeInfo>> map = this.knownAppMasters;
        synchronized (map) {
            amNodeInfo = this.getAMNodeInfo(amLocation, port, queryIdentifier);
            if (amNodeInfo == null) {
                amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, this.retryPolicy, this.retryTimeout, this.socketFactory, this.conf);
            }
        }
        ListenableFuture future = this.executor.submit((Callable)((Object)new KillTaskCallable(taskAttemptId, amNodeInfo)));
        Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(Void result) {
                LOG.info("Sent taskKilled for {}", (Object)taskAttemptId);
            }

            public void onFailure(Throwable t) {
                LOG.warn("Failed to send taskKilled for {}. The attempt will likely time out.", (Object)taskAttemptId);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void queryComplete(QueryIdentifier queryIdentifier) {
        if (queryIdentifier != null) {
            Map<QueryIdentifier, Map<LlapNodeId, AMNodeInfo>> map = this.knownAppMasters;
            synchronized (map) {
                LOG.debug("Query complete received for {}", (Object)queryIdentifier);
                Map<LlapNodeId, AMNodeInfo> amNodeInfoPerQuery = this.knownAppMasters.remove(queryIdentifier);
                if (amNodeInfoPerQuery != null) {
                    LOG.debug("Removed following AMs due to query complete:");
                    for (AMNodeInfo amNodeInfo : amNodeInfoPerQuery.values()) {
                        amNodeInfo.setIsDone(true);
                        LOG.debug(amNodeInfo.toString());
                    }
                }
            }
        }
    }

    protected LlapTaskUmbilicalProtocol createUmbilical(final AMNodeInfo amNodeInfo) throws IOException, InterruptedException {
        final InetSocketAddress address = NetUtils.createSocketAddrForHost((String)amNodeInfo.amNodeId.getHostname(), (int)amNodeInfo.amNodeId.getPort());
        SecurityUtil.setTokenService(amNodeInfo.jobToken, (InetSocketAddress)address);
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)amNodeInfo.umbilicalUser);
        ugi.addToken(amNodeInfo.jobToken);
        return (LlapTaskUmbilicalProtocol)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>(){

            @Override
            public LlapTaskUmbilicalProtocol run() throws Exception {
                return (LlapTaskUmbilicalProtocol)RPC.getProxy(LlapTaskUmbilicalProtocol.class, (long)1L, (InetSocketAddress)address, (UserGroupInformation)UserGroupInformation.getCurrentUser(), (Configuration)amNodeInfo.conf, (SocketFactory)amNodeInfo.socketFactory, (int)((int)amNodeInfo.timeout));
            }
        });
    }

    private AMNodeInfo getAMNodeInfo(String amHost, int amPort, QueryIdentifier queryId) {
        Map<LlapNodeId, AMNodeInfo> amNodeInfoPerQuery = this.knownAppMasters.get(queryId);
        if (amNodeInfoPerQuery != null) {
            LlapNodeId amNodeId = LlapNodeId.getInstance((String)amHost, (int)amPort);
            return amNodeInfoPerQuery.get(amNodeId);
        }
        return null;
    }

    private static final class TaskSnapshot {
        public final List<TezTaskAttemptID> attempts;
        public final List<BooleanWritable> guaranteed;

        public TaskSnapshot(int count) {
            this.attempts = new ArrayList<TezTaskAttemptID>(count);
            this.guaranteed = new ArrayList<BooleanWritable>(count);
        }
    }

    protected class AMNodeInfo
    implements Delayed {
        private final ConcurrentHashMap<TezTaskAttemptID, Boolean> tasks = new ConcurrentHashMap();
        private final String umbilicalUser;
        private final Token<JobTokenIdentifier> jobToken;
        private final Configuration conf;
        private final LlapNodeId amNodeId;
        private final RetryPolicy retryPolicy;
        private final long timeout;
        private final SocketFactory socketFactory;
        private final AtomicBoolean amFailed = new AtomicBoolean(false);
        private final QueryIdentifier queryIdentifier;
        private LlapTaskUmbilicalProtocol umbilical;
        private long nextHeartbeatTime;
        private final AtomicBoolean isDone = new AtomicBoolean(false);

        public AMNodeInfo(LlapNodeId amNodeId, String umbilicalUser, Token<JobTokenIdentifier> jobToken, QueryIdentifier currentQueryIdentifier, RetryPolicy retryPolicy, long timeout, SocketFactory socketFactory, Configuration conf) {
            this.umbilicalUser = umbilicalUser;
            this.jobToken = jobToken;
            this.queryIdentifier = currentQueryIdentifier;
            this.retryPolicy = retryPolicy;
            this.timeout = timeout;
            this.socketFactory = socketFactory;
            this.conf = conf;
            this.amNodeId = amNodeId;
        }

        synchronized LlapTaskUmbilicalProtocol getUmbilical() throws IOException, InterruptedException {
            if (this.umbilical == null) {
                this.umbilical = AMReporter.this.createUmbilical(this);
            }
            return this.umbilical;
        }

        synchronized void stopUmbilical() {
            if (this.umbilical != null) {
                RPC.stopProxy((Object)this.umbilical);
            }
            this.umbilical = null;
        }

        void addTaskAttempt(TezTaskAttemptID attemptId, boolean isGuaranteed) {
            Boolean oldVal = this.tasks.putIfAbsent(attemptId, isGuaranteed);
            if (oldVal != null) {
                throw new RuntimeException(attemptId + " was already registered");
            }
        }

        void updateTaskAttempt(TezTaskAttemptID attemptId, boolean isGuaranteed) {
            Boolean oldVal = this.tasks.replace(attemptId, isGuaranteed);
            if (oldVal == null) {
                LOG.warn("Task " + attemptId + " is no longer registered");
                this.tasks.remove(attemptId);
            }
        }

        void removeTaskAttempt(TezTaskAttemptID attemptId) {
            Boolean oldVal = this.tasks.remove(attemptId);
            if (oldVal == null) {
                throw new RuntimeException(attemptId + " was not registered and couldn't be removed");
            }
        }

        void setAmFailed(boolean val) {
            this.amFailed.set(val);
        }

        boolean hasAmFailed() {
            return this.amFailed.get();
        }

        void setIsDone(boolean val) {
            this.isDone.set(val);
        }

        boolean isDone() {
            return this.isDone.get();
        }

        TaskSnapshot getTasksSnapshot() {
            TaskSnapshot result = new TaskSnapshot(this.tasks.size());
            for (Map.Entry<TezTaskAttemptID, Boolean> e : this.tasks.entrySet()) {
                result.attempts.add(e.getKey());
                result.guaranteed.add(new BooleanWritable(e.getValue().booleanValue()));
            }
            return result;
        }

        public QueryIdentifier getQueryIdentifier() {
            return this.queryIdentifier;
        }

        synchronized void setNextHeartbeatTime(long nextTime) {
            this.nextHeartbeatTime = nextTime;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.nextHeartbeatTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            AMNodeInfo other = (AMNodeInfo)o;
            if (this.nextHeartbeatTime > other.nextHeartbeatTime) {
                return 1;
            }
            if (this.nextHeartbeatTime < other.nextHeartbeatTime) {
                return -1;
            }
            return 0;
        }

        public String toString() {
            return "AMInfo: " + this.amNodeId + ", taskCount=" + this.getTaskCount() + ", queryIdentifier=" + this.queryIdentifier;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private int getTaskCount() {
            ConcurrentHashMap<TezTaskAttemptID, Boolean> concurrentHashMap = this.tasks;
            synchronized (concurrentHashMap) {
                return this.tasks.size();
            }
        }
    }

    private class AMHeartbeatCallable
    extends CallableWithNdc<Void> {
        final AMNodeInfo amNodeInfo;

        public AMHeartbeatCallable(AMNodeInfo amNodeInfo) {
            this.amNodeInfo = amNodeInfo;
        }

        protected Void callInternal() {
            block6: {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Attempting to heartbeat to AM: " + this.amNodeInfo);
                }
                TaskSnapshot tasks = this.amNodeInfo.getTasksSnapshot();
                if (tasks.attempts.isEmpty()) {
                    return null;
                }
                try {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("NodeHeartbeat to: " + this.amNodeInfo);
                    }
                    LlapTaskUmbilicalProtocol.TezAttemptArray aw = new LlapTaskUmbilicalProtocol.TezAttemptArray();
                    aw.set((Writable[])tasks.attempts.toArray(new TezTaskAttemptID[tasks.attempts.size()]));
                    LlapTaskUmbilicalProtocol.BooleanArray guaranteed = new LlapTaskUmbilicalProtocol.BooleanArray();
                    guaranteed.set((Writable[])tasks.guaranteed.toArray(new BooleanWritable[tasks.guaranteed.size()]));
                    this.amNodeInfo.getUmbilical().nodeHeartbeat(new Text(AMReporter.this.nodeId.getHostname()), new Text(AMReporter.this.daemonId.getUniqueNodeIdInCluster()), AMReporter.this.nodeId.getPort(), aw, guaranteed);
                }
                catch (IOException e) {
                    QueryIdentifier currentQueryIdentifier = this.amNodeInfo.getQueryIdentifier();
                    this.amNodeInfo.setAmFailed(true);
                    LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}", new Object[]{this.amNodeInfo.amNodeId, currentQueryIdentifier, e});
                    AMReporter.this.queryFailedHandler.queryFailed(currentQueryIdentifier);
                }
                catch (InterruptedException e) {
                    if (AMReporter.this.isShutdown.get()) break block6;
                    LOG.warn("Interrupted while trying to send heartbeat to AM {}", (Object)this.amNodeInfo.amNodeId, (Object)e);
                }
            }
            return null;
        }
    }

    private class KillTaskCallable
    extends CallableWithNdc<Void> {
        final AMNodeInfo amNodeInfo;
        final TezTaskAttemptID taskAttemptId;

        public KillTaskCallable(TezTaskAttemptID taskAttemptId, AMNodeInfo amNodeInfo) {
            this.taskAttemptId = taskAttemptId;
            this.amNodeInfo = amNodeInfo;
        }

        protected Void callInternal() {
            block3: {
                try {
                    this.amNodeInfo.getUmbilical().taskKilled(this.taskAttemptId);
                }
                catch (IOException e) {
                    LOG.warn("Failed to send taskKilled message for task {}. Will re-run after it times out", (Object)this.taskAttemptId);
                }
                catch (InterruptedException e) {
                    if (AMReporter.this.isShutdown.get()) break block3;
                    LOG.info("Interrupted while trying to send taskKilled message for task {}", (Object)this.taskAttemptId);
                }
            }
            return null;
        }
    }

    private class QueueLookupCallable
    extends CallableWithNdc<Void> {
        private QueueLookupCallable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected Void callInternal() {
            while (!AMReporter.this.isShutdown.get() && !Thread.currentThread().isInterrupted()) {
                try {
                    final AMNodeInfo amNodeInfo = (AMNodeInfo)AMReporter.this.pendingHeartbeatQueeu.take();
                    if (amNodeInfo.hasAmFailed() || amNodeInfo.isDone()) {
                        Map<QueryIdentifier, Map<LlapNodeId, AMNodeInfo>> map = AMReporter.this.knownAppMasters;
                        synchronized (map) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}, isDone={}", new Object[]{amNodeInfo.amNodeId, amNodeInfo.getQueryIdentifier(), amNodeInfo.getTaskCount(), amNodeInfo.hasAmFailed(), amNodeInfo.isDone()});
                            }
                            AMReporter.this.knownAppMasters.remove(amNodeInfo.getQueryIdentifier());
                            continue;
                        }
                    }
                    long next = System.currentTimeMillis() + AMReporter.this.heartbeatInterval;
                    amNodeInfo.setNextHeartbeatTime(next);
                    AMReporter.this.pendingHeartbeatQueeu.add(amNodeInfo);
                    if (amNodeInfo.getTaskCount() <= 0) continue;
                    ListenableFuture future = AMReporter.this.executor.submit((Callable)((Object)new AMHeartbeatCallable(amNodeInfo)));
                    Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Void>(){

                        public void onSuccess(Void result) {
                        }

                        public void onFailure(Throwable t) {
                            QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier();
                            amNodeInfo.setAmFailed(true);
                            LOG.warn("Heartbeat failed to AM {}. Marking query as failed. query={}", new Object[]{amNodeInfo.amNodeId, currentQueryIdentifier, t});
                            AMReporter.this.queryFailedHandler.queryFailed(currentQueryIdentifier);
                        }
                    }, (Executor)MoreExecutors.directExecutor());
                }
                catch (InterruptedException e) {
                    if (AMReporter.this.isShutdown.get()) {
                        LOG.info("QueueLookup thread interrupted after shutdown");
                        continue;
                    }
                    LOG.warn("Received unexpected interrupt while waiting on heartbeat queue");
                }
            }
            return null;
        }
    }
}

