package org.apache.kafka.connect.integration;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.integration.BlockingConnectorTest;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.class */
public class OffsetsApiIntegrationTest {
    private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
    private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30);
    private static final int NUM_WORKERS = 3;
    private static final String CONNECTOR_NAME = "test-connector";
    private static final String TOPIC = "test-topic";
    private static final int NUM_TASKS = 2;
    private static final int NUM_RECORDS_PER_PARTITION = 10;
    private Map<String, String> workerProps;
    private EmbeddedConnectCluster.Builder connectBuilder;
    private EmbeddedConnectCluster connect;

    @Before
    public void setup() {
        Properties properties = new Properties();
        properties.put("transaction.state.log.replication.factor", "1");
        properties.put("transaction.state.log.min.isr", "1");
        this.workerProps = new HashMap();
        this.workerProps.put("offset.flush.interval.ms", String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
        this.connectBuilder = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(NUM_WORKERS).brokerProps(properties).workerProps(this.workerProps);
    }

    @After
    public void tearDown() {
        this.connect.stop();
    }

    @Test
    public void testGetNonExistentConnectorOffsets() {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Assert.assertEquals(404L, Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.connectorOffsets("non-existent-connector");
        }).errorCode());
    }

    @Test
    public void testGetSinkConnectorOffsets() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), this.connect.kafka());
    }

    @Test
    public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Map<String, String> baseSinkConnectorConfigs = baseSinkConnectorConfigs();
        baseSinkConnectorConfigs.put("consumer.override.group.id", "overridden-group-id");
        getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs, this.connect.kafka());
        Admin createAdminClient = this.connect.kafka().createAdminClient();
        try {
            Collection collection = (Collection) createAdminClient.listConsumerGroups().all().get();
            Assert.assertTrue(collection.stream().anyMatch(consumerGroupListing -> {
                return "overridden-group-id".equals(consumerGroupListing.groupId());
            }));
            Assert.assertTrue(collection.stream().noneMatch(consumerGroupListing2 -> {
                return SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing2.groupId());
            }));
            if (createAdminClient != null) {
                createAdminClient.close();
            }
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
        Objects.requireNonNull(embeddedKafkaCluster);
        AutoCloseable autoCloseable = embeddedKafkaCluster::stop;
        try {
            embeddedKafkaCluster.start();
            Map<String, String> baseSinkConnectorConfigs = baseSinkConnectorConfigs();
            baseSinkConnectorConfigs.put("consumer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            baseSinkConnectorConfigs.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs, embeddedKafkaCluster);
            if (autoCloseable != null) {
                autoCloseable.close();
            }
        } catch (Throwable th) {
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void getAndVerifySinkConnectorOffsets(Map<String, String> map, EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
        embeddedKafkaCluster.createTopic(TOPIC, 5);
        for (int i = 0; i < 5; i++) {
            for (int i2 = 0; i2 < NUM_RECORDS_PER_PARTITION; i2++) {
                embeddedKafkaCluster.produce(TOPIC, Integer.valueOf(i), "key", "value");
            }
        }
        this.connect.configureConnector(CONNECTOR_NAME, map);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets");
        for (int i3 = 0; i3 < 5; i3++) {
            for (int i4 = 0; i4 < NUM_RECORDS_PER_PARTITION; i4++) {
                embeddedKafkaCluster.produce(TOPIC, Integer.valueOf(i3), "key", "value");
            }
        }
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, 20, "Sink connector consumer group offsets should catch up to the topic end offsets");
    }

    @Test
    public void testGetSourceConnectorOffsets() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
    }

    @Test
    public void testGetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Map<String, String> baseSourceConnectorConfigs = baseSourceConnectorConfigs();
        baseSourceConnectorConfigs.put("offsets.storage.topic", "custom-offsets-topic");
        getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs);
    }

    @Test
    public void testGetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
        Objects.requireNonNull(embeddedKafkaCluster);
        AutoCloseable autoCloseable = embeddedKafkaCluster::stop;
        try {
            embeddedKafkaCluster.start();
            Map<String, String> baseSourceConnectorConfigs = baseSourceConnectorConfigs();
            baseSourceConnectorConfigs.put("producer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            baseSourceConnectorConfigs.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs);
            if (autoCloseable != null) {
                autoCloseable.close();
            }
        } catch (Throwable th) {
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void getAndVerifySourceConnectorOffsets(Map<String, String> map) throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, map);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced");
        map.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(20));
        this.connect.configureConnector(CONNECTOR_NAME, map);
        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 20, "Source connector offsets should reflect the expected number of records produced");
    }

    @Test
    public void testAlterOffsetsNonExistentConnector() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Assert.assertEquals(404L, Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.alterConnectorOffsets("non-existent-connector", new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(Collections.emptyMap(), Collections.emptyMap()))));
        }).errorCode());
    }

    @Test
    public void testAlterOffsetsNonStoppedConnector() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs());
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_TASKS; i++) {
            arrayList.add(new ConnectorOffset(Collections.singletonMap("task.id", "test-connector-" + i), Collections.singletonMap("saved", 5)));
        }
        Assert.assertEquals(400L, Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(arrayList));
        }).errorCode());
        this.connect.pauseConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksArePaused(CONNECTOR_NAME, NUM_TASKS, "Connector did not pause in time");
        Assert.assertEquals(400L, Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(arrayList));
        }).errorCode());
    }

    @Test
    public void testAlterSinkConnectorOffsets() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), this.connect.kafka());
    }

    @Test
    public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Map<String, String> baseSinkConnectorConfigs = baseSinkConnectorConfigs();
        baseSinkConnectorConfigs.put("consumer.override.group.id", "overridden-group-id");
        alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs, this.connect.kafka());
        Admin createAdminClient = this.connect.kafka().createAdminClient();
        try {
            Collection collection = (Collection) createAdminClient.listConsumerGroups().all().get();
            Assert.assertTrue(collection.stream().anyMatch(consumerGroupListing -> {
                return "overridden-group-id".equals(consumerGroupListing.groupId());
            }));
            Assert.assertTrue(collection.stream().noneMatch(consumerGroupListing2 -> {
                return SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing2.groupId());
            }));
            if (createAdminClient != null) {
                createAdminClient.close();
            }
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
        Objects.requireNonNull(embeddedKafkaCluster);
        AutoCloseable autoCloseable = embeddedKafkaCluster::stop;
        try {
            embeddedKafkaCluster.start();
            Map<String, String> baseSinkConnectorConfigs = baseSinkConnectorConfigs();
            baseSinkConnectorConfigs.put("consumer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            baseSinkConnectorConfigs.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs, embeddedKafkaCluster);
            if (autoCloseable != null) {
                autoCloseable.close();
            }
        } catch (Throwable th) {
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void alterAndVerifySinkConnectorOffsets(Map<String, String> map, EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
        embeddedKafkaCluster.createTopic(TOPIC, NUM_WORKERS);
        for (int i = 0; i < NUM_WORKERS; i++) {
            for (int i2 = 0; i2 < NUM_RECORDS_PER_PARTITION; i2++) {
                embeddedKafkaCluster.produce(TOPIC, Integer.valueOf(i), "key", "value");
            }
        }
        this.connect.configureConnector(CONNECTOR_NAME, map);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, NUM_WORKERS, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets");
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("kafka_topic", TOPIC);
        hashMap.put("kafka_partition", 0);
        arrayList.add(new ConnectorOffset(hashMap, (Map) null));
        for (int i3 = 1; i3 < NUM_WORKERS; i3++) {
            HashMap hashMap2 = new HashMap();
            hashMap2.put("kafka_topic", TOPIC);
            hashMap2.put("kafka_partition", Integer.valueOf(i3));
            arrayList.add(new ConnectorOffset(hashMap2, Collections.singletonMap("kafka_offset", 5)));
        }
        MatcherAssert.assertThat(this.connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(arrayList)), CoreMatchers.containsString("The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."));
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, NUM_WORKERS - 1, 5, "Sink connector consumer group offsets should reflect the altered offsets");
        map.put("alter.offsets.result", "true");
        this.connect.configureConnector(CONNECTOR_NAME, map);
        arrayList.clear();
        for (int i4 = 1; i4 < NUM_WORKERS; i4++) {
            HashMap hashMap3 = new HashMap();
            hashMap3.put("kafka_topic", TOPIC);
            hashMap3.put("kafka_partition", Integer.valueOf(i4));
            arrayList.add(new ConnectorOffset(hashMap3, Collections.singletonMap("kafka_offset", Integer.valueOf(NUM_WORKERS))));
        }
        MatcherAssert.assertThat(this.connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(arrayList)), CoreMatchers.containsString("The offsets for this connector have been altered successfully"));
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, NUM_WORKERS - 1, NUM_WORKERS, "Sink connector consumer group offsets should reflect the altered offsets");
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not resume in time");
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, NUM_WORKERS, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets");
    }

    @Test
    public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.kafka().createTopic(TOPIC, 1);
        for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
            this.connect.kafka().produce(TOPIC, 0, "key", "value");
        }
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", BlockingConnectorTest.BlockingSinkConnector.class.getName());
        hashMap.put("topics", TOPIC);
        hashMap.put(BlockingConnectorTest.Block.BLOCK_CONFIG, "Task::stop");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector tasks did not start in time.");
        this.connect.stopConnector(CONNECTOR_NAME);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("kafka_topic", TOPIC);
        hashMap2.put("kafka_partition", 0);
        List singletonList = Collections.singletonList(new ConnectorOffset(hashMap2, (Map) null));
        MatcherAssert.assertThat(Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(singletonList));
        }).getMessage(), CoreMatchers.containsString("zombie sink task"));
    }

    @Test
    public void testAlterSinkConnectorOffsetsInvalidRequestBody() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorConfigs());
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        String endpointForResource = this.connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME));
        Response requestPatch = this.connect.requestPatch(endpointForResource, "{}");
        try {
            Assert.assertEquals(400L, requestPatch.getStatus());
            MatcherAssert.assertThat(requestPatch.getEntity().toString(), CoreMatchers.containsString("Partitions / offsets need to be provided for an alter offsets request"));
            if (requestPatch != null) {
                requestPatch.close();
            }
            Response requestPatch2 = this.connect.requestPatch(endpointForResource, "{\"offsets\": []}");
            try {
                Assert.assertEquals(400L, requestPatch2.getStatus());
                MatcherAssert.assertThat(requestPatch2.getEntity().toString(), CoreMatchers.containsString("Partitions / offsets need to be provided for an alter offsets request"));
                if (requestPatch2 != null) {
                    requestPatch2.close();
                }
                Response requestPatch3 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [{}]}");
                try {
                    Assert.assertEquals(400L, requestPatch3.getStatus());
                    MatcherAssert.assertThat(requestPatch3.getEntity().toString(), CoreMatchers.containsString("The partition for a sink connector offset cannot be null or missing"));
                    if (requestPatch3 != null) {
                        requestPatch3.close();
                    }
                    Response requestPatch4 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [{\"partition\": null, \"offset\": null}]}");
                    try {
                        Assert.assertEquals(400L, requestPatch4.getStatus());
                        MatcherAssert.assertThat(requestPatch4.getEntity().toString(), CoreMatchers.containsString("The partition for a sink connector offset cannot be null or missing"));
                        if (requestPatch4 != null) {
                            requestPatch4.close();
                        }
                        Response requestPatch5 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [{\"partition\": {}, \"offset\": null}]}");
                        try {
                            Assert.assertEquals(400L, requestPatch5.getStatus());
                            MatcherAssert.assertThat(requestPatch5.getEntity().toString(), CoreMatchers.containsString("The partition for a sink connector offset must contain the keys 'kafka_topic' and 'kafka_partition'"));
                            if (requestPatch5 != null) {
                                requestPatch5.close();
                            }
                            Response requestPatch6 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [{\"partition\": {\"kafka_topic\": \"test\", \"kafka_partition\": \"not a number\"}, \"offset\": null}]}");
                            try {
                                Assert.assertEquals(400L, requestPatch6.getStatus());
                                MatcherAssert.assertThat(requestPatch6.getEntity().toString(), CoreMatchers.containsString("Partition values for sink connectors need to be integers"));
                                if (requestPatch6 != null) {
                                    requestPatch6.close();
                                }
                                Response requestPatch7 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [{\"partition\": {\"kafka_topic\": \"test\", \"kafka_partition\": 1}, \"offset\": {}}]}");
                                try {
                                    Assert.assertEquals(400L, requestPatch7.getStatus());
                                    MatcherAssert.assertThat(requestPatch7.getEntity().toString(), CoreMatchers.containsString("The offset for a sink connector should either be null or contain the key 'kafka_offset'"));
                                    if (requestPatch7 != null) {
                                        requestPatch7.close();
                                    }
                                    requestPatch2 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [{\"partition\": {\"kafka_topic\": \"test\", \"kafka_partition\": 1}, \"offset\": {\"kafka_offset\": \"not a number\"}}]}");
                                    try {
                                        Assert.assertEquals(400L, requestPatch2.getStatus());
                                        MatcherAssert.assertThat(requestPatch2.getEntity().toString(), CoreMatchers.containsString("Offset values for sink connectors need to be integers"));
                                        if (requestPatch2 != null) {
                                            requestPatch2.close();
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            } finally {
                                if (requestPatch6 != null) {
                                    try {
                                        requestPatch6.close();
                                    } catch (Throwable th) {
                                        th.addSuppressed(th);
                                    }
                                }
                            }
                        } finally {
                            if (requestPatch5 != null) {
                                try {
                                    requestPatch5.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                        }
                    } finally {
                        if (requestPatch4 != null) {
                            try {
                                requestPatch4.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        }
                    }
                } finally {
                    if (requestPatch3 != null) {
                        try {
                            requestPatch3.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                }
            } finally {
                if (requestPatch2 != null) {
                    try {
                        requestPatch2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }
        } finally {
            if (requestPatch != null) {
                try {
                    requestPatch.close();
                } catch (Throwable th6) {
                    th.addSuppressed(th6);
                }
            }
        }
    }

    @Test
    public void testAlterSourceConnectorOffsets() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
    }

    @Test
    public void testAlterSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Map<String, String> baseSourceConnectorConfigs = baseSourceConnectorConfigs();
        baseSourceConnectorConfigs.put("offsets.storage.topic", "custom-offsets-topic");
        alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs);
    }

    @Test
    public void testAlterSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
        Objects.requireNonNull(embeddedKafkaCluster);
        AutoCloseable autoCloseable = embeddedKafkaCluster::stop;
        try {
            embeddedKafkaCluster.start();
            Map<String, String> baseSourceConnectorConfigs = baseSourceConnectorConfigs();
            baseSourceConnectorConfigs.put("producer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            baseSourceConnectorConfigs.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs);
            if (autoCloseable != null) {
                autoCloseable.close();
            }
        } catch (Throwable th) {
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception {
        this.workerProps.put("exactly.once.source.support", "enabled");
        this.connect = this.connectBuilder.workerProps(this.workerProps).build();
        this.connect.start();
        alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
    }

    public void alterAndVerifySourceConnectorOffsets(Map<String, String> map) throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, map);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced");
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_TASKS; i++) {
            arrayList.add(new ConnectorOffset(Collections.singletonMap("task.id", "test-connector-" + i), Collections.singletonMap("saved", 5)));
        }
        MatcherAssert.assertThat(this.connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(arrayList)), CoreMatchers.containsString("The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."));
        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 5, "Source connector offsets should reflect the altered offsets");
        map.put("alter.offsets.result", "true");
        this.connect.configureConnector(CONNECTOR_NAME, map);
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < NUM_TASKS; i2++) {
            arrayList2.add(new ConnectorOffset(Collections.singletonMap("task.id", "test-connector-" + i2), Collections.singletonMap("saved", 7)));
        }
        MatcherAssert.assertThat(this.connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(arrayList2)), CoreMatchers.containsString("The offsets for this connector have been altered successfully"));
        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 7, "Source connector offsets should reflect the altered offsets");
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not resume in time");
        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced");
    }

    @Test
    public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs());
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        String endpointForResource = this.connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME));
        Response requestPatch = this.connect.requestPatch(endpointForResource, "[]");
        try {
            Assert.assertEquals(500L, requestPatch.getStatus());
            MatcherAssert.assertThat(requestPatch.getEntity().toString(), CoreMatchers.containsString("Cannot deserialize value"));
            if (requestPatch != null) {
                requestPatch.close();
            }
            Response requestPatch2 = this.connect.requestPatch(endpointForResource, "{}");
            try {
                Assert.assertEquals(400L, requestPatch2.getStatus());
                MatcherAssert.assertThat(requestPatch2.getEntity().toString(), CoreMatchers.containsString("Partitions / offsets need to be provided for an alter offsets request"));
                if (requestPatch2 != null) {
                    requestPatch2.close();
                }
                Response requestPatch3 = this.connect.requestPatch(endpointForResource, "{\"key\": []}");
                try {
                    Assert.assertEquals(500L, requestPatch3.getStatus());
                    MatcherAssert.assertThat(requestPatch3.getEntity().toString(), CoreMatchers.containsString("Unrecognized field"));
                    if (requestPatch3 != null) {
                        requestPatch3.close();
                    }
                    Response requestPatch4 = this.connect.requestPatch(endpointForResource, "{\"offsets\": []}");
                    try {
                        Assert.assertEquals(400L, requestPatch4.getStatus());
                        MatcherAssert.assertThat(requestPatch4.getEntity().toString(), CoreMatchers.containsString("Partitions / offsets need to be provided for an alter offsets request"));
                        if (requestPatch4 != null) {
                            requestPatch4.close();
                        }
                        Response requestPatch5 = this.connect.requestPatch(endpointForResource, "{\"offsets\": {}}");
                        try {
                            Assert.assertEquals(500L, requestPatch5.getStatus());
                            MatcherAssert.assertThat(requestPatch5.getEntity().toString(), CoreMatchers.containsString("Cannot deserialize value"));
                            if (requestPatch5 != null) {
                                requestPatch5.close();
                            }
                            Response requestPatch6 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [123]}");
                            try {
                                Assert.assertEquals(500L, requestPatch6.getStatus());
                                MatcherAssert.assertThat(requestPatch6.getEntity().toString(), CoreMatchers.containsString("Cannot construct instance"));
                                if (requestPatch6 != null) {
                                    requestPatch6.close();
                                }
                                Response requestPatch7 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [{\"key\": \"val\"}]}");
                                try {
                                    Assert.assertEquals(500L, requestPatch7.getStatus());
                                    MatcherAssert.assertThat(requestPatch7.getEntity().toString(), CoreMatchers.containsString("Unrecognized field"));
                                    if (requestPatch7 != null) {
                                        requestPatch7.close();
                                    }
                                    requestPatch2 = this.connect.requestPatch(endpointForResource, "{\"offsets\": [{\"partition\": []]}]}");
                                    try {
                                        Assert.assertEquals(500L, requestPatch2.getStatus());
                                        MatcherAssert.assertThat(requestPatch2.getEntity().toString(), CoreMatchers.containsString("Cannot deserialize value"));
                                        if (requestPatch2 != null) {
                                            requestPatch2.close();
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            } finally {
                                if (requestPatch6 != null) {
                                    try {
                                        requestPatch6.close();
                                    } catch (Throwable th) {
                                        th.addSuppressed(th);
                                    }
                                }
                            }
                        } finally {
                            if (requestPatch5 != null) {
                                try {
                                    requestPatch5.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                        }
                    } finally {
                        if (requestPatch4 != null) {
                            try {
                                requestPatch4.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        }
                    }
                } finally {
                    if (requestPatch3 != null) {
                        try {
                            requestPatch3.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                }
            } finally {
                if (requestPatch2 != null) {
                    try {
                        requestPatch2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }
        } finally {
            if (requestPatch != null) {
                try {
                    requestPatch.close();
                } catch (Throwable th6) {
                    th.addSuppressed(th6);
                }
            }
        }
    }

    @Test
    public void testResetSinkConnectorOffsets() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), this.connect.kafka());
    }

    @Test
    public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Map<String, String> baseSinkConnectorConfigs = baseSinkConnectorConfigs();
        baseSinkConnectorConfigs.put("consumer.override.group.id", "overridden-group-id");
        resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs, this.connect.kafka());
        Admin createAdminClient = this.connect.kafka().createAdminClient();
        try {
            Collection collection = (Collection) createAdminClient.listConsumerGroups().all().get();
            Assert.assertTrue(collection.stream().anyMatch(consumerGroupListing -> {
                return "overridden-group-id".equals(consumerGroupListing.groupId());
            }));
            Assert.assertTrue(collection.stream().noneMatch(consumerGroupListing2 -> {
                return SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing2.groupId());
            }));
            if (createAdminClient != null) {
                createAdminClient.close();
            }
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
        Objects.requireNonNull(embeddedKafkaCluster);
        AutoCloseable autoCloseable = embeddedKafkaCluster::stop;
        try {
            embeddedKafkaCluster.start();
            Map<String, String> baseSinkConnectorConfigs = baseSinkConnectorConfigs();
            baseSinkConnectorConfigs.put("consumer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            baseSinkConnectorConfigs.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs, embeddedKafkaCluster);
            if (autoCloseable != null) {
                autoCloseable.close();
            }
        } catch (Throwable th) {
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void resetAndVerifySinkConnectorOffsets(Map<String, String> map, EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
        embeddedKafkaCluster.createTopic(TOPIC, NUM_WORKERS);
        for (int i = 0; i < NUM_WORKERS; i++) {
            for (int i2 = 0; i2 < NUM_RECORDS_PER_PARTITION; i2++) {
                embeddedKafkaCluster.produce(TOPIC, Integer.valueOf(i), "key", "value");
            }
        }
        this.connect.configureConnector(CONNECTOR_NAME, map);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, NUM_WORKERS, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets");
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        MatcherAssert.assertThat(this.connect.resetConnectorOffsets(CONNECTOR_NAME), CoreMatchers.containsString("The Connect framework-managed offsets for this connector have been reset successfully. However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
        verifyEmptyConnectorOffsets(CONNECTOR_NAME);
        MatcherAssert.assertThat(this.connect.resetConnectorOffsets(CONNECTOR_NAME), CoreMatchers.containsString("The Connect framework-managed offsets for this connector have been reset successfully. However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
        verifyEmptyConnectorOffsets(CONNECTOR_NAME);
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not resume in time");
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, NUM_WORKERS, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets");
    }

    @Test
    public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.kafka().createTopic(TOPIC, 1);
        for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
            this.connect.kafka().produce(TOPIC, 0, "key", "value");
        }
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", BlockingConnectorTest.BlockingSinkConnector.class.getName());
        hashMap.put("topics", TOPIC);
        hashMap.put(BlockingConnectorTest.Block.BLOCK_CONFIG, "Task::stop");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector tasks did not start in time.");
        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets");
        this.connect.stopConnector(CONNECTOR_NAME);
        MatcherAssert.assertThat(Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.resetConnectorOffsets(CONNECTOR_NAME);
        }).getMessage(), CoreMatchers.containsString("zombie sink task"));
    }

    @Test
    public void testResetSourceConnectorOffsets() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
    }

    @Test
    public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Map<String, String> baseSourceConnectorConfigs = baseSourceConnectorConfigs();
        baseSourceConnectorConfigs.put("offsets.storage.topic", "custom-offsets-topic");
        resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs);
    }

    @Test
    public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
        Objects.requireNonNull(embeddedKafkaCluster);
        AutoCloseable autoCloseable = embeddedKafkaCluster::stop;
        try {
            embeddedKafkaCluster.start();
            Map<String, String> baseSourceConnectorConfigs = baseSourceConnectorConfigs();
            baseSourceConnectorConfigs.put("producer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            baseSourceConnectorConfigs.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs);
            if (autoCloseable != null) {
                autoCloseable.close();
            }
        } catch (Throwable th) {
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception {
        this.workerProps.put("exactly.once.source.support", "enabled");
        this.connect = this.connectBuilder.workerProps(this.workerProps).build();
        this.connect.start();
        resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
    }

    public void resetAndVerifySourceConnectorOffsets(Map<String, String> map) throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, map);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time.");
        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced");
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        MatcherAssert.assertThat(this.connect.resetConnectorOffsets(CONNECTOR_NAME), CoreMatchers.containsString("The Connect framework-managed offsets for this connector have been reset successfully. However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
        verifyEmptyConnectorOffsets(CONNECTOR_NAME);
        MatcherAssert.assertThat(this.connect.resetConnectorOffsets(CONNECTOR_NAME), CoreMatchers.containsString("The Connect framework-managed offsets for this connector have been reset successfully. However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
        verifyEmptyConnectorOffsets(CONNECTOR_NAME);
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not resume in time");
        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced");
    }

    private Map<String, String> baseSinkConnectorConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(NUM_TASKS));
        hashMap.put("topics", TOPIC);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        return hashMap;
    }

    private Map<String, String> baseSourceConnectorConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(NUM_TASKS));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, TOPIC);
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "3");
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(NUM_RECORDS_PER_PARTITION));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", "1");
        hashMap.put("topic.creation.default.partitions", "1");
        return hashMap;
    }

    private void verifyExpectedSinkConnectorOffsets(String str, String str2, int i, int i2, String str3) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            ConnectorOffsets connectorOffsets = this.connect.connectorOffsets(str);
            if (connectorOffsets.offsets().size() != i) {
                return false;
            }
            for (ConnectorOffset connectorOffset : connectorOffsets.offsets()) {
                Assert.assertEquals(str2, connectorOffset.partition().get("kafka_topic"));
                if (((Integer) connectorOffset.offset().get("kafka_offset")).intValue() != i2) {
                    return false;
                }
            }
            return true;
        }, OFFSET_READ_TIMEOUT_MS, str3);
    }

    private void verifyExpectedSourceConnectorOffsets(String str, int i, int i2, String str2) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            ConnectorOffsets connectorOffsets = this.connect.connectorOffsets(str);
            if (connectorOffsets.offsets().size() != i) {
                return false;
            }
            for (ConnectorOffset connectorOffset : connectorOffsets.offsets()) {
                Assert.assertTrue(((String) connectorOffset.partition().get("task.id")).startsWith(CONNECTOR_NAME));
                if (((Integer) connectorOffset.offset().get("saved")).intValue() != i2) {
                    return false;
                }
            }
            return true;
        }, OFFSET_READ_TIMEOUT_MS, str2);
    }

    private void verifyEmptyConnectorOffsets(String str) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return this.connect.connectorOffsets(str).offsets().isEmpty();
        }, OFFSET_READ_TIMEOUT_MS, "Connector offsets should be empty after resetting offsets");
    }
}
