package org.apache.kafka.connect.integration;

import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.class */
public class ExampleConnectIntegrationTest {
    private static final int NUM_RECORDS_PRODUCED = 2000;
    private static final int NUM_TOPIC_PARTITIONS = 3;
    private static final int NUM_TASKS = 3;
    private static final int NUM_WORKERS = 3;
    private static final String CONNECTOR_NAME = "simple-conn";
    private EmbeddedConnectCluster connect;
    private ConnectorHandle connectorHandle;

    @Rule
    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
    private static final Logger log = LoggerFactory.getLogger(ExampleConnectIntegrationTest.class);
    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
    private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
    private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();

    @Before
    public void setup() throws ClassNotFoundException {
        HashMap hashMap = new HashMap();
        hashMap.put("offset.flush.interval.ms", String.valueOf(5000));
        Properties properties = new Properties();
        properties.put("auto.create.topics.enable", "false");
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(3).numBrokers(1).workerProps(hashMap).brokerProps(properties).build();
        this.connect.start();
        this.connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
    }

    @After
    public void close() {
        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
        this.connect.stop();
    }

    @Test
    public void testSinkConnector() throws Exception {
        this.connect.kafka().createTopic("test-topic", 3);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", SINK_CONNECTOR_CLASS_NAME);
        hashMap.put("tasks.max", String.valueOf(3));
        hashMap.put("topics", "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        this.connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
        this.connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(SINK_CONNECTOR_CLASS_NAME, hashMap, 1, "Validating connector configuration produced an unexpected number or errors.");
        hashMap.put("name", CONNECTOR_NAME);
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(SINK_CONNECTOR_CLASS_NAME, hashMap, 0, "Validating connector configuration produced an unexpected number or errors.");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        TestUtils.waitForCondition(this::checkForPartitionAssignment, CONNECTOR_SETUP_DURATION_MS, "Connector tasks were not assigned a partition each.");
        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
            this.connect.kafka().produce("test-topic", Integer.valueOf(i % 3), "key", "simple-message-value-" + i);
        }
        Assert.assertEquals("Unexpected number of records consumed", 2000L, this.connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count());
        this.connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
        this.connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
        this.connect.deleteConnector(CONNECTOR_NAME);
    }

    @Test
    public void testSourceConnector() throws Exception {
        this.connect.kafka().createTopic("test-topic", 3);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", SOURCE_CONNECTOR_CLASS_NAME);
        hashMap.put("tasks.max", String.valueOf(3));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
        hashMap.put("throughput", String.valueOf(500));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        this.connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
        this.connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(SOURCE_CONNECTOR_CLASS_NAME, hashMap, 1, "Validating connector configuration produced an unexpected number or errors.");
        hashMap.put("name", CONNECTOR_NAME);
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(SOURCE_CONNECTOR_CLASS_NAME, hashMap, 0, "Validating connector configuration produced an unexpected number or errors.");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        this.connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
        this.connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
        int count = this.connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count();
        Assert.assertTrue("Not enough records produced by source connector. Expected at least: 2000 + but got " + count, count >= NUM_RECORDS_PRODUCED);
        this.connect.deleteConnector(CONNECTOR_NAME);
    }

    private boolean checkForPartitionAssignment() {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(CONNECTOR_NAME);
            if (connectorStatus != null && connectorStatus.tasks().size() == 3) {
                if (this.connectorHandle.tasks().stream().allMatch(taskHandle -> {
                    return taskHandle.numPartitionsAssigned() == 1;
                })) {
                    return true;
                }
            }
            return false;
        } catch (Exception e) {
            log.error("Could not check connector state info.", e);
            return false;
        }
    }
}
