package org.apache.kafka.trogdor.coordinator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.SendingState;
import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.apache.kafka.trogdor.rest.FaultDataMap;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/coordinator/Coordinator.class */
public final class Coordinator {
    private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
    private final Time time;
    private final long startTimeMs;
    private final Platform platform;
    private final JsonRestServer restServer;
    private final KafkaThread thread;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition cond = this.lock.newCondition();
    private boolean shutdown = false;
    private final FaultSet pendingFaults = new FaultSet();
    private final FaultSet processedFaults = new FaultSet();
    private final CoordinatorRunnable runnable = new CoordinatorRunnable();
    private final Map<String, NodeManager> nodeManagers = new HashMap();

    /* loaded from: input_file:org/apache/kafka/trogdor/coordinator/Coordinator$CoordinatorRunnable.class */
    class CoordinatorRunnable implements Runnable {
        CoordinatorRunnable() {
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            Coordinator.log.info("Starting main service thread.");
            long j = 0;
            while (true) {
                try {
                    try {
                        long milliseconds = Coordinator.this.time.milliseconds();
                        ArrayList arrayList = new ArrayList();
                        Coordinator.this.lock.lock();
                        try {
                            if (Coordinator.this.shutdown) {
                                Coordinator.this.lock.unlock();
                                break;
                            }
                            if (j > milliseconds) {
                                if (Coordinator.this.cond.await(j - milliseconds, TimeUnit.MILLISECONDS)) {
                                    Coordinator.log.trace("CoordinatorRunnable woke up early.");
                                }
                                milliseconds = Coordinator.this.time.milliseconds();
                                if (Coordinator.this.shutdown) {
                                    Coordinator.this.lock.unlock();
                                    break;
                                }
                            }
                            j = milliseconds + 3600000;
                            FaultSet.FaultSetIterator iterateByStart = Coordinator.this.pendingFaults.iterateByStart();
                            while (true) {
                                if (!iterateByStart.hasNext()) {
                                    break;
                                }
                                Fault next = iterateByStart.next();
                                if (milliseconds < next.spec().startMs()) {
                                    j = Math.min(j, next.spec().startMs());
                                    break;
                                } else {
                                    arrayList.add(next);
                                    iterateByStart.remove();
                                    Coordinator.this.processedFaults.add(next);
                                }
                            }
                            Coordinator.this.lock.unlock();
                            Iterator it = arrayList.iterator();
                            while (it.hasNext()) {
                                Coordinator.this.startFault(milliseconds, (Fault) it.next());
                            }
                        } catch (Throwable th) {
                            Coordinator.this.lock.unlock();
                            throw th;
                        }
                    } catch (Throwable th2) {
                        Coordinator.log.error("CoordinatorRunnable shutting down with exception", th2);
                        Coordinator.log.info("CoordinatorRunnable shutting down.");
                        Coordinator.this.restServer.stop();
                        Iterator it2 = Coordinator.this.nodeManagers.values().iterator();
                        while (it2.hasNext()) {
                            ((NodeManager) it2.next()).beginShutdown();
                        }
                        Iterator it3 = Coordinator.this.nodeManagers.values().iterator();
                        while (it3.hasNext()) {
                            ((NodeManager) it3.next()).waitForShutdown();
                        }
                        return;
                    }
                } catch (Throwable th3) {
                    Coordinator.log.info("CoordinatorRunnable shutting down.");
                    Coordinator.this.restServer.stop();
                    Iterator it4 = Coordinator.this.nodeManagers.values().iterator();
                    while (it4.hasNext()) {
                        ((NodeManager) it4.next()).beginShutdown();
                    }
                    Iterator it5 = Coordinator.this.nodeManagers.values().iterator();
                    while (it5.hasNext()) {
                        ((NodeManager) it5.next()).waitForShutdown();
                    }
                    throw th3;
                }
            }
            Coordinator.log.info("CoordinatorRunnable shutting down.");
            Coordinator.this.restServer.stop();
            Iterator it6 = Coordinator.this.nodeManagers.values().iterator();
            while (it6.hasNext()) {
                ((NodeManager) it6.next()).beginShutdown();
            }
            Iterator it7 = Coordinator.this.nodeManagers.values().iterator();
            while (it7.hasNext()) {
                ((NodeManager) it7.next()).waitForShutdown();
            }
        }
    }

    public Coordinator(Platform platform, Time time, JsonRestServer jsonRestServer, CoordinatorRestResource coordinatorRestResource) {
        this.platform = platform;
        this.time = time;
        this.startTimeMs = time.milliseconds();
        this.restServer = jsonRestServer;
        for (Node node : platform.topology().nodes().values()) {
            if (Node.Util.getTrogdorAgentPort(node) > 0) {
                this.nodeManagers.put(node.name(), new NodeManager(time, node));
            }
        }
        if (this.nodeManagers.isEmpty()) {
            log.warn("No agent nodes configured.");
        }
        this.thread = new KafkaThread("TrogdorCoordinatorThread", this.runnable, false);
        this.thread.start();
        coordinatorRestResource.setCoordinator(this);
    }

    public int port() {
        return this.restServer.port();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startFault(long j, Fault fault) {
        Set<String> targetNodes = fault.targetNodes(this.platform.topology());
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        for (String str : targetNodes) {
            NodeManager nodeManager = this.nodeManagers.get(str);
            if (nodeManager == null) {
                hashSet2.add(str);
            } else {
                hashSet.add(nodeManager);
                hashSet3.add(str);
            }
        }
        if (!hashSet2.isEmpty()) {
            log.warn("Fault {} refers to {} non-existent node(s): {}", new Object[]{fault.id(), Integer.valueOf(hashSet2.size()), Utils.join(hashSet2, ", ")});
        }
        log.info("Applying fault {} on {} node(s): {}", new Object[]{fault.id(), Integer.valueOf(hashSet3.size()), Utils.join(hashSet3, ", ")});
        if (hashSet3.isEmpty()) {
            fault.setState(new DoneState(j, ""));
        } else {
            fault.setState(new SendingState(hashSet3));
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((NodeManager) it.next()).enqueueFault(fault);
        }
    }

    public void beginShutdown() {
        this.lock.lock();
        try {
            this.shutdown = true;
            this.cond.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    public void waitForShutdown() {
        try {
            this.thread.join();
        } catch (InterruptedException e) {
            log.error("Interrupted while waiting for thread shutdown", e);
            Thread.currentThread().interrupt();
        }
    }

    public long startTimeMs() {
        return this.startTimeMs;
    }

    public CoordinatorFaultsResponse getFaults() {
        TreeMap treeMap = new TreeMap();
        this.lock.lock();
        try {
            getFaultsImpl(treeMap, this.pendingFaults);
            getFaultsImpl(treeMap, this.processedFaults);
            return new CoordinatorFaultsResponse(treeMap);
        } finally {
            this.lock.unlock();
        }
    }

    private void getFaultsImpl(Map<String, FaultDataMap.FaultData> map, FaultSet faultSet) {
        FaultSet.FaultSetIterator iterateByStart = faultSet.iterateByStart();
        while (iterateByStart.hasNext()) {
            Fault next = iterateByStart.next();
            map.put(next.id(), new FaultDataMap.FaultData(next.spec(), next.state()));
        }
    }

    public void createFault(CreateCoordinatorFaultRequest createCoordinatorFaultRequest) throws ClassNotFoundException {
        this.lock.lock();
        try {
            this.pendingFaults.add(FaultSpec.Util.createFault(createCoordinatorFaultRequest.id(), createCoordinatorFaultRequest.spec()));
            this.cond.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentParser description = ArgumentParsers.newArgumentParser("trogdor-coordinator").defaultHelp(true).description("The Trogdor fault injection coordinator");
        description.addArgument(new String[]{"--coordinator.config"}).action(Arguments.store()).required(true).type(String.class).dest("config").metavar(new String[]{"CONFIG"}).help("The configuration file to use.");
        description.addArgument(new String[]{"--node-name"}).action(Arguments.store()).required(true).type(String.class).dest("node_name").metavar(new String[]{"NODE_NAME"}).help("The name of this node.");
        Namespace namespace = null;
        try {
            namespace = description.parseArgs(strArr);
        } catch (ArgumentParserException e) {
            if (strArr.length == 0) {
                description.printHelp();
                Exit.exit(0);
            } else {
                description.handleError(e);
                Exit.exit(1);
            }
        }
        Platform parse = Platform.Config.parse(namespace.getString("node_name"), namespace.getString("config"));
        JsonRestServer jsonRestServer = new JsonRestServer(Node.Util.getTrogdorCoordinatorPort(parse.curNode()));
        CoordinatorRestResource coordinatorRestResource = new CoordinatorRestResource();
        Coordinator coordinator = new Coordinator(parse, Time.SYSTEM, jsonRestServer, coordinatorRestResource);
        jsonRestServer.start(coordinatorRestResource);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.kafka.trogdor.coordinator.Coordinator.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Coordinator.log.error("Running shutdown hook...");
                Coordinator.this.beginShutdown();
                Coordinator.this.waitForShutdown();
            }
        });
        coordinator.waitForShutdown();
    }
}
