package org.apache.kafka.trogdor.coordinator;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.SendingState;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/trogdor/coordinator/NodeManager.class */
public class NodeManager {
    private static final Logger log = LoggerFactory.getLogger(NodeManager.class);
    private final Time time;
    private final Node node;
    private final AgentClient client;
    private final long heartbeatMs;
    private final KafkaThread thread;
    private boolean shutdown = false;
    private final Lock lock = new ReentrantLock();
    private final Condition cond = this.lock.newCondition();
    private final List<Fault> faultQueue = new ArrayList();
    private long lastContactMs = 0;
    private final NodeManagerRunnable runnable = new NodeManagerRunnable();

    /* loaded from: input_file:org/apache/kafka/trogdor/coordinator/NodeManager$NodeManagerRunnable.class */
    class NodeManagerRunnable implements Runnable {
        NodeManagerRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Fault fault = null;
            long j = 0;
            while (true) {
                try {
                    long milliseconds = NodeManager.this.time.milliseconds();
                    if (fault != null) {
                        j = milliseconds;
                        if (NodeManager.this.sendFault(milliseconds, fault)) {
                            fault = null;
                        }
                    }
                    long j2 = j + NodeManager.this.heartbeatMs;
                    if (milliseconds < j2) {
                        j = milliseconds;
                        NodeManager.this.sendHeartbeat(milliseconds);
                    }
                    long max = Math.max(0L, j2 - milliseconds);
                    NodeManager.this.lock.lock();
                    try {
                        if (NodeManager.this.shutdown) {
                            return;
                        }
                        try {
                            NodeManager.this.cond.await(max, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            NodeManager.log.info("{}: NodeManagerRunnable got InterruptedException", NodeManager.this.node.name());
                            Thread.currentThread().interrupt();
                        }
                        if (fault == null && !NodeManager.this.faultQueue.isEmpty()) {
                            fault = (Fault) NodeManager.this.faultQueue.remove(0);
                        }
                        NodeManager.this.lock.unlock();
                    } finally {
                        NodeManager.this.lock.unlock();
                    }
                } catch (Throwable th) {
                    NodeManager.log.warn("{}: exiting NodeManagerRunnable with exception", NodeManager.this.node.name(), th);
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/coordinator/NodeManager$NodeStatus.class */
    public static class NodeStatus {
        private final String nodeName;
        private final long lastContactMs;

        NodeStatus(String str, long j) {
            this.nodeName = str;
            this.lastContactMs = j;
        }

        public String nodeName() {
            return this.nodeName;
        }

        public long lastContactMs() {
            return this.lastContactMs;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NodeManager(Time time, Node node) {
        this.time = time;
        this.node = node;
        this.client = new AgentClient(node.hostname(), Node.Util.getTrogdorAgentPort(node));
        this.heartbeatMs = Node.Util.getIntConfig(node, Platform.Config.TROGDOR_COORDINATOR_HEARTBEAT_MS, Platform.Config.TROGDOR_COORDINATOR_HEARTBEAT_MS_DEFAULT);
        this.thread = new KafkaThread("NodeManagerThread(" + node.name() + ")", this.runnable, false);
        this.thread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean sendFault(long j, Fault fault) {
        try {
            this.client.putFault(new CreateAgentFaultRequest(fault.id(), fault.spec()));
            this.lock.lock();
            try {
                this.lastContactMs = j;
                this.lock.unlock();
                if (!((SendingState) fault.state()).completeSend(this.node.name())) {
                    return true;
                }
                fault.setState(new DoneState(j, ""));
                return true;
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        } catch (Exception e) {
            log.warn("{}: error sending fault to {}.", new Object[]{this.node.name(), this.client.target(), e});
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendHeartbeat(long j) {
        try {
            AgentStatusResponse status = this.client.getStatus();
            this.lock.lock();
            try {
                this.lastContactMs = j;
                this.lock.unlock();
                log.debug("{}: got heartbeat status {}.", this.node.name(), status);
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        } catch (Exception e) {
            log.warn("{}: error sending heartbeat to {}.", new Object[]{this.node.name(), this.client.target(), e});
        }
    }

    public void beginShutdown() {
        this.lock.lock();
        try {
            if (this.shutdown) {
                return;
            }
            log.trace("{}: beginning shutdown.", this.node.name());
            this.shutdown = true;
            this.cond.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    public void waitForShutdown() {
        log.trace("waiting for NodeManager({}) shutdown.", this.node.name());
        try {
            this.thread.join();
        } catch (InterruptedException e) {
            log.error("{}: Interrupted while waiting for thread shutdown", this.node.name(), e);
            Thread.currentThread().interrupt();
        }
    }

    public NodeStatus status() {
        this.lock.lock();
        try {
            return new NodeStatus(this.node.name(), this.lastContactMs);
        } finally {
            this.lock.unlock();
        }
    }

    public void enqueueFault(Fault fault) {
        this.lock.lock();
        try {
            log.trace("{}: added {} to fault queue.", this.node.name(), fault);
            this.faultQueue.add(fault);
            this.cond.signalAll();
        } finally {
            this.lock.unlock();
        }
    }
}
