package org.apache.kafka.connect.integration;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.class */
public class RebalanceSourceConnectorsIntegrationTest {
    private static final int NUM_TOPIC_PARTITIONS = 3;
    private static final int NUM_WORKERS = 3;
    private static final int NUM_TASKS = 4;
    private static final String CONNECTOR_NAME = "seq-source1";
    private static final String TOPIC_NAME = "sequential-topic";
    private EmbeddedConnectCluster connect;

    @Rule
    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
    private static final Logger log = LoggerFactory.getLogger(RebalanceSourceConnectorsIntegrationTest.class);
    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    private static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);

    @Before
    public void setup() throws ClassNotFoundException {
        HashMap hashMap = new HashMap();
        hashMap.put("connect.protocol", ConnectProtocolCompatibility.COMPATIBLE.toString());
        hashMap.put("offset.flush.interval.ms", String.valueOf(TimeUnit.SECONDS.toMillis(30L)));
        hashMap.put("scheduled.rebalance.max.delay.ms", String.valueOf(TimeUnit.SECONDS.toMillis(30L)));
        Properties properties = new Properties();
        properties.put("auto.create.topics.enable", "false");
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(3).numBrokers(1).workerProps(hashMap).brokerProps(properties).build();
        this.connect.start();
    }

    @After
    public void close() {
        this.connect.stop();
    }

    @Test
    public void testStartTwoConnectors() throws Exception {
        this.connect.kafka().createTopic(TOPIC_NAME, 3);
        Map<String, String> defaultSourceConnectorProps = defaultSourceConnectorProps(TOPIC_NAME);
        this.connect.assertions().assertAtLeastNumWorkersAreUp(3, "Connect workers did not start in time.");
        this.connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        this.connect.configureConnector("another-source", defaultSourceConnectorProps);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("another-source", NUM_TASKS, "Connector tasks did not start in time.");
    }

    @Test
    public void testReconfigConnector() throws Exception {
        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
        this.connect.kafka().createTopic(TOPIC_NAME, 3);
        this.connect.kafka().createTopic("another-topic", 3);
        Map<String, String> defaultSourceConnectorProps = defaultSourceConnectorProps(TOPIC_NAME);
        this.connect.assertions().assertAtLeastNumWorkersAreUp(3, "Connect workers did not start in time.");
        this.connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        long millis = TimeUnit.SECONDS.toMillis(30L);
        int count = this.connect.kafka().consume(100, millis, TOPIC_NAME).count();
        Assert.assertTrue("Not enough records produced by source connector. Expected at least: 100 + but got " + count, count >= 100);
        StartAndStopLatch expectedStarts = connectorHandle.expectedStarts(1);
        defaultSourceConnectorProps.put(MonitorableSourceConnector.TOPIC_CONFIG, "another-topic");
        this.connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps);
        Assert.assertTrue("Failed to alter connector configuration and see connector and tasks restart within " + CONNECTOR_SETUP_DURATION_MS + "ms", expectedStarts.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        int count2 = this.connect.kafka().consume(100, millis, "another-topic").count();
        Assert.assertTrue("Not enough records produced by source connector. Expected at least: 100 + but got " + count2, count2 >= 100);
    }

    @Test
    public void testDeleteConnector() throws Exception {
        this.connect.kafka().createTopic(TOPIC_NAME, 3);
        Map<String, String> defaultSourceConnectorProps = defaultSourceConnectorProps(TOPIC_NAME);
        this.connect.assertions().assertAtLeastNumWorkersAreUp(3, "Connect workers did not start in time.");
        IntStream.range(0, NUM_TASKS).forEachOrdered(i -> {
            this.connect.configureConnector(CONNECTOR_NAME + i, defaultSourceConnectorProps);
        });
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("seq-source13", NUM_TASKS, "Connector tasks did not start in time.");
        this.connect.deleteConnector("seq-source13");
        this.connect.assertions().assertConnectorAndTasksAreStopped("seq-source13", "Connector tasks did not stop in time.");
        TestUtils.waitForCondition(this::assertConnectorAndTasksAreUnique, WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
    }

    @Test
    public void testAddingWorker() throws Exception {
        this.connect.kafka().createTopic(TOPIC_NAME, 3);
        Map<String, String> defaultSourceConnectorProps = defaultSourceConnectorProps(TOPIC_NAME);
        this.connect.assertions().assertAtLeastNumWorkersAreUp(3, "Connect workers did not start in time.");
        IntStream.range(0, NUM_TASKS).forEachOrdered(i -> {
            this.connect.configureConnector(CONNECTOR_NAME + i, defaultSourceConnectorProps);
        });
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("seq-source13", NUM_TASKS, "Connector tasks did not start in time.");
        this.connect.addWorker();
        this.connect.assertions().assertAtLeastNumWorkersAreUp(NUM_TASKS, "Connect workers did not start in time.");
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("seq-source13", NUM_TASKS, "Connector tasks did not start in time.");
        TestUtils.waitForCondition(this::assertConnectorAndTasksAreUnique, WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
    }

    @Test
    public void testRemovingWorker() throws Exception {
        this.connect.kafka().createTopic(TOPIC_NAME, 3);
        Map<String, String> defaultSourceConnectorProps = defaultSourceConnectorProps(TOPIC_NAME);
        this.connect.assertions().assertExactlyNumWorkersAreUp(3, "Connect workers did not start in time.");
        IntStream.range(0, NUM_TASKS).forEachOrdered(i -> {
            this.connect.configureConnector(CONNECTOR_NAME + i, defaultSourceConnectorProps);
        });
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("seq-source13", NUM_TASKS, "Connector tasks did not start in time.");
        this.connect.removeWorker();
        this.connect.assertions().assertExactlyNumWorkersAreUp(2, "Connect workers did not start in time.");
        TestUtils.waitForCondition(this::assertConnectorAndTasksAreUnique, WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
    }

    private Map<String, String> defaultSourceConnectorProps(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(NUM_TASKS));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, str);
        hashMap.put("throughput", String.valueOf(10));
        hashMap.put("messages.per.poll", String.valueOf(10));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        return hashMap;
    }

    private boolean assertConnectorAndTasksAreUnique() {
        try {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            for (String str : this.connect.connectors()) {
                ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
                ((Collection) hashMap.computeIfAbsent(connectorStatus.connector().workerId(), str2 -> {
                    return new ArrayList();
                })).add(str);
                connectorStatus.tasks().forEach(taskState -> {
                    ((Collection) hashMap2.computeIfAbsent(taskState.workerId(), str3 -> {
                        return new ArrayList();
                    })).add(str + "-" + taskState.id());
                });
            }
            int orElse = hashMap.values().stream().mapToInt((v0) -> {
                return v0.size();
            }).max().orElse(0);
            int orElse2 = hashMap2.values().stream().mapToInt((v0) -> {
                return v0.size();
            }).max().orElse(0);
            Assert.assertNotEquals("Found no connectors running!", orElse, 0L);
            Assert.assertNotEquals("Found no tasks running!", orElse2, 0L);
            Assert.assertEquals("Connector assignments are not unique: " + hashMap, hashMap.values().size(), ((List) hashMap.values().stream().distinct().collect(Collectors.toList())).size());
            Assert.assertEquals("Task assignments are not unique: " + hashMap2, hashMap2.values().size(), ((List) hashMap2.values().stream().distinct().collect(Collectors.toList())).size());
            return true;
        } catch (Exception e) {
            log.error("Could not check connector state info.", e);
            return false;
        }
    }
}
