package org.apache.kafka.connect.integration;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.class */
public class ErrorHandlingIntegrationTest {

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private static final int NUM_WORKERS = 1;
    private static final String DLQ_TOPIC = "my-connector-errors";
    private static final String CONNECTOR_NAME = "error-conn";
    private static final String TASK_ID = "error-conn-0";
    private static final int NUM_RECORDS_PRODUCED = 20;
    private static final int EXPECTED_CORRECT_RECORDS = 19;
    private static final int EXPECTED_INCORRECT_RECORDS = 1;
    private static final int NUM_TASKS = 1;
    private EmbeddedConnectCluster connect;
    private ConnectorHandle connectorHandle;
    private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
    private static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(30);

    /* loaded from: input_file:org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest$FaultyPassthrough.class */
    public static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
        static final ConfigDef CONFIG_DEF = new ConfigDef();
        static final int BAD_RECORD_VAL_RETRIABLE = 4;
        static final int BAD_RECORD_VAL = 7;
        private boolean shouldFail = true;

        public String version() {
            return "1.0";
        }

        public R apply(R r) {
            if ("value-4".equals(r.value()) && this.shouldFail) {
                this.shouldFail = false;
                throw new RetriableException("Error when value='value-4'. A reattempt with this record will succeed.");
            }
            if ("value-7".equals(r.value())) {
                throw new RetriableException("Error when value='value-7'");
            }
            return r;
        }

        public ConfigDef config() {
            return CONFIG_DEF;
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
        }
    }

    @Before
    public void setup() throws InterruptedException {
        this.connect = new EmbeddedConnectCluster.Builder().build();
        this.connect.start();
        this.connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
        this.connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
    }

    @After
    public void close() {
        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
        this.connect.stop();
    }

    @Test
    public void testSkipRetryAndDLQWithHeaders() throws Exception {
        this.connect.kafka().createTopic("test-topic");
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topics", "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("transforms", "failing_transform");
        hashMap.put("transforms.failing_transform.type", FaultyPassthrough.class.getName());
        hashMap.put("errors.log.enable", "true");
        hashMap.put("errors.log.include.messages", "true");
        hashMap.put("errors.deadletterqueue.topic.name", DLQ_TOPIC);
        hashMap.put("errors.deadletterqueue.context.headers.enable", "true");
        hashMap.put("errors.deadletterqueue.topic.replication.factor", "1");
        hashMap.put("errors.tolerance", "all");
        hashMap.put("errors.retry.timeout", "1000");
        this.connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector tasks did not start in time.");
        TestUtils.waitForCondition(this::checkForPartitionAssignment, CONNECTOR_SETUP_DURATION_MS, "Connector task was not assigned a partition.");
        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
            this.connect.kafka().produce("test-topic", "key-" + i, "value-" + i);
        }
        log.info("Consuming records from test topic");
        int i2 = 0;
        Iterator it = this.connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic").iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            String str = new String((byte[]) consumerRecord.key());
            String str2 = new String((byte[]) consumerRecord.value());
            log.debug("Consumed record (key='{}', value='{}') from topic {}", new Object[]{str, str2, consumerRecord.topic()});
            Assert.assertEquals("Unexpected key", str, "key-" + i2);
            Assert.assertEquals("Unexpected value", str2, "value-" + i2);
            i2++;
        }
        this.connectorHandle.taskHandle(TASK_ID).awaitRecords(CONSUME_MAX_DURATION_MS);
        log.info("Consuming records from test topic");
        Iterator it2 = this.connect.kafka().consume(1, CONSUME_MAX_DURATION_MS, DLQ_TOPIC).iterator();
        while (it2.hasNext()) {
            ConsumerRecord consumerRecord2 = (ConsumerRecord) it2.next();
            log.debug("Consumed record (key={}, value={}) from dead letter queue topic {}", new Object[]{new String((byte[]) consumerRecord2.key()), new String((byte[]) consumerRecord2.value()), DLQ_TOPIC});
            Assert.assertTrue(consumerRecord2.headers().toArray().length > 0);
            assertValue("test-topic", consumerRecord2.headers(), "__connect.errors.topic");
            assertValue(RetriableException.class.getName(), consumerRecord2.headers(), "__connect.errors.exception.class.name");
            assertValue("Error when value='value-7'", consumerRecord2.headers(), "__connect.errors.exception.message");
        }
        this.connect.deleteConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME, "Connector tasks did not stop in time.");
    }

    @Test
    public void testErrantRecordReporter() throws Exception {
        this.connect.kafka().createTopic("test-topic");
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", ErrantRecordSinkConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topics", "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("errors.log.enable", "true");
        hashMap.put("errors.log.include.messages", "true");
        hashMap.put("errors.deadletterqueue.topic.name", DLQ_TOPIC);
        hashMap.put("errors.deadletterqueue.context.headers.enable", "true");
        hashMap.put("errors.deadletterqueue.topic.replication.factor", "1");
        hashMap.put("errors.tolerance", "all");
        hashMap.put("errors.retry.timeout", "1000");
        this.connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector tasks did not start in time.");
        TestUtils.waitForCondition(this::checkForPartitionAssignment, CONNECTOR_SETUP_DURATION_MS, "Connector task was not assigned a partition.");
        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
            this.connect.kafka().produce("test-topic", "key-" + i, "value-" + i);
        }
        log.info("Consuming records from test topic");
        int i2 = 0;
        Iterator it = this.connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic").iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            String str = new String((byte[]) consumerRecord.key());
            String str2 = new String((byte[]) consumerRecord.value());
            log.debug("Consumed record (key='{}', value='{}') from topic {}", new Object[]{str, str2, consumerRecord.topic()});
            Assert.assertEquals("Unexpected key", str, "key-" + i2);
            Assert.assertEquals("Unexpected value", str2, "value-" + i2);
            i2++;
        }
        this.connectorHandle.taskHandle(TASK_ID).awaitRecords(CONSUME_MAX_DURATION_MS);
        log.info("Consuming records from test topic");
        this.connect.kafka().consume(1, CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
        this.connect.deleteConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME, "Connector tasks did not stop in time.");
    }

    private boolean checkForPartitionAssignment() {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(CONNECTOR_NAME);
            if (connectorStatus != null && connectorStatus.tasks().size() == 1) {
                if (this.connectorHandle.taskHandle(TASK_ID).numPartitionsAssigned() == 1) {
                    return true;
                }
            }
            return false;
        } catch (Exception e) {
            log.error("Could not check connector state info.", e);
            return false;
        }
    }

    private void assertValue(String str, Headers headers, String str2) {
        byte[] value = headers.lastHeader(str2).value();
        if (str == null && value == null) {
            return;
        }
        if (str == null || value == null) {
            Assert.fail();
        }
        Assert.assertEquals(str, new String(value));
    }
}
