package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.source.TransactionContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.test.util.ConcurrencyUtils;
import org.apache.kafka.connect.test.util.MockitoUtils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.OngoingStubbing;
import org.mockito.verification.VerificationMode;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.class */
public class ExactlyOnceWorkerSourceTaskTest {
    private static final String TOPIC = "topic";
    private WorkerConfig config;
    private SourceConnectorConfig sourceConfig;
    private Plugins plugins;
    private MockConnectMetrics metrics;

    @Mock
    private ErrorHandlingMetrics errorHandlingMetrics;
    private Time time;
    private ExactlyOnceWorkerSourceTask workerTask;

    @Mock
    private SourceTask sourceTask;

    @Mock
    private Converter keyConverter;

    @Mock
    private Converter valueConverter;

    @Mock
    private HeaderConverter headerConverter;

    @Mock
    private TransformationChain<SourceRecord> transformationChain;

    @Mock
    private Producer<byte[], byte[]> producer;

    @Mock
    private TopicAdmin admin;

    @Mock
    private CloseableOffsetStorageReader offsetReader;

    @Mock
    private OffsetStorageWriter offsetWriter;

    @Mock
    private ClusterConfigState clusterConfigState;

    @Mock
    private TaskStatus.Listener statusListener;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private ConnectorOffsetBackingStore offsetStore;

    @Mock
    private Runnable preProducerCheck;

    @Mock
    private Runnable postProducerCheck;
    private static final TaskConfig TASK_CONFIG;
    private static final SourceRecord SOURCE_RECORD_1;
    private static final SourceRecord SOURCE_RECORD_2;
    private static final List<SourceRecord> RECORDS;
    private final boolean enableTopicCreation;
    private Future<?> workerTaskFuture;
    private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
    private static final Map<String, ?> OFFSET = offset(12);
    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
    private static final Integer KEY = -1;
    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
    private static final Long VALUE_1 = 12L;
    private static final Long VALUE_2 = 13L;
    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
    private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
    private static final Map<String, String> TASK_PROPS = new HashMap();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);

    @Rule
    public MockitoRule rule = MockitoJUnit.rule();
    private final AtomicReference<CountDownLatch> pollLatch = new AtomicReference<>(new CountDownLatch(0));
    private final AtomicReference<List<SourceRecord>> pollRecords = new AtomicReference<>(RECORDS);
    private boolean taskStarted = false;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest$TestSourceTask.class */
    private static abstract class TestSourceTask extends SourceTask {
        private TestSourceTask() {
        }
    }

    @Parameterized.Parameters
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    public ExactlyOnceWorkerSourceTaskTest(boolean z) {
        this.enableTopicCreation = z;
    }

    @Before
    public void setup() throws Exception {
        Map<String, String> workerProps = workerProps();
        this.plugins = new Plugins(workerProps);
        this.config = new StandaloneConfig(workerProps);
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(), true);
        this.metrics = new MockConnectMetrics();
        this.time = Time.SYSTEM;
        Mockito.when(this.offsetStore.primaryOffsetsTopic()).thenReturn("offsets-topic");
        Mockito.when(this.sourceTask.poll()).thenAnswer(invocationOnMock -> {
            List<SourceRecord> list = this.pollRecords.get();
            this.pollLatch.get().countDown();
            Thread.sleep(10L);
            return list;
        });
    }

    @After
    public void teardown() throws Exception {
        ((SourceTask) Mockito.verify(this.sourceTask, MockitoUtils.anyTimes())).poll();
        ((SourceTask) Mockito.verify(this.sourceTask, MockitoUtils.anyTimes())).commit();
        ((SourceTask) Mockito.verify(this.sourceTask, MockitoUtils.anyTimes())).commitRecord((SourceRecord) ArgumentMatchers.any(), (RecordMetadata) ArgumentMatchers.any());
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, MockitoUtils.anyTimes())).offset(PARTITION, OFFSET);
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, MockitoUtils.anyTimes())).beginFlush();
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, MockitoUtils.anyTimes())).doFlush((Callback) ArgumentMatchers.any());
        if (this.enableTopicCreation) {
            ((TopicAdmin) Mockito.verify(this.admin, MockitoUtils.anyTimes())).describeTopics(new String[]{"topic"});
        }
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore, MockitoUtils.anyTimes())).getTopic((String) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore, MockitoUtils.anyTimes())).primaryOffsetsTopic();
        Mockito.verifyNoMoreInteractions(new Object[]{this.statusListener, this.producer, this.sourceTask, this.admin, this.offsetWriter, this.statusBackingStore, this.offsetStore, this.preProducerCheck, this.postProducerCheck});
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    private Map<String, String> workerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter.schemas.enable", "false");
        hashMap.put("internal.value.converter.schemas.enable", "false");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        hashMap.put("topic.creation.enable", String.valueOf(this.enableTopicCreation));
        return hashMap;
    }

    private Map<String, String> sourceConnectorProps() {
        return sourceConnectorProps(SourceTask.TransactionBoundary.DEFAULT);
    }

    private Map<String, String> sourceConnectorProps(SourceTask.TransactionBoundary transactionBoundary) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "foo-connector");
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topic", "topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.groups", String.join(",", "foo", "bar"));
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        hashMap.put("transaction.boundary", transactionBoundary.toString());
        hashMap.put("topic.creation.foo.include", "topic");
        hashMap.put("topic.creation.bar.include", ".*");
        hashMap.put("topic.creation.bar.exclude", "topic");
        return hashMap;
    }

    private static Map<String, ?> offset(int i) {
        return Collections.singletonMap("key", Integer.valueOf(i));
    }

    private void createWorkerTask() {
        createWorkerTask(TargetState.STARTED);
    }

    private void createWorkerTask(TargetState targetState) {
        createWorkerTask(targetState, this.keyConverter, this.valueConverter, this.headerConverter);
    }

    private void createWorkerTask(TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter) {
        this.workerTask = new ExactlyOnceWorkerSourceTask(this.taskId, this.sourceTask, this.statusListener, targetState, converter, converter2, headerConverter, this.transformationChain, this.producer, this.admin, TopicCreationGroup.configuredGroups(this.sourceConfig), this.offsetReader, this.offsetWriter, this.offsetStore, this.config, this.clusterConfigState, this.metrics, this.errorHandlingMetrics, this.plugins.delegatingLoader(), this.time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, this.statusBackingStore, this.sourceConfig, (v0) -> {
            v0.run();
        }, this.preProducerCheck, this.postProducerCheck, Collections::emptyList);
    }

    @Test
    public void testRemoveMetrics() {
        createWorkerTask();
        this.workerTask.removeMetrics();
        Assert.assertEquals(Collections.emptySet(), filterToTaskMetrics(this.metrics.metrics().metrics().keySet()));
    }

    private Set<MetricName> filterToTaskMetrics(Set<MetricName> set) {
        return (Set) set.stream().filter(metricName -> {
            return this.metrics.registry().taskGroupName().equals(metricName.group()) || this.metrics.registry().sourceTaskGroupName().equals(metricName.group());
        }).collect(Collectors.toSet());
    }

    @Test
    public void testStartPaused() throws Exception {
        createWorkerTask(TargetState.PAUSED);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((TaskStatus.Listener) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.statusListener)).onPause((ConnectorTaskId) ArgumentMatchers.eq(this.taskId));
        startTaskThread();
        Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        awaitShutdown();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onPause(this.taskId);
        ((SourceTask) Mockito.verify(this.sourceTask, Mockito.never())).start((Map) ArgumentMatchers.any());
        ((SourceTask) Mockito.verify(this.sourceTask, Mockito.never())).poll();
        verifyCleanShutdown();
        assertPollMetrics(0);
    }

    @Test
    public void testPause() throws Exception {
        createWorkerTask();
        expectSuccessfulSends();
        expectSuccessfulFlushes();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((TaskStatus.Listener) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.statusListener)).onPause((ConnectorTaskId) ArgumentMatchers.eq(this.taskId));
        startTaskThread();
        awaitPolls(2);
        this.workerTask.transitionTo(TargetState.PAUSED);
        long pollCount = pollCount();
        ConcurrencyUtils.awaitLatch(countDownLatch, "task did not pause in time");
        Assert.assertTrue(((long) pollCount()) - pollCount <= 1);
        awaitShutdown();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onPause(this.taskId);
        verifyTransactions(pollCount() + 2, pollCount() + 2);
        verifySends();
        verifyPossibleTopicCreation();
        Assert.assertTrue(((long) pollCount()) - pollCount <= 1);
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
    }

    @Test
    public void testFailureInPreProducerCheck() throws Exception {
        createWorkerTask();
        ((Runnable) Mockito.doThrow(new Throwable[]{new ConnectException("Failed to perform zombie fencing")}).when(this.preProducerCheck)).run();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.run();
        ((Runnable) Mockito.verify(this.preProducerCheck)).run();
        verifyShutdown(true, false);
    }

    @Test
    public void testFailureInProducerInitialization() throws Exception {
        createWorkerTask();
        ((Producer) Mockito.doThrow(new Throwable[]{new ConnectException("You can't do that!")}).when(this.producer)).initTransactions();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.run();
        ((Runnable) Mockito.verify(this.preProducerCheck)).run();
        ((Producer) Mockito.verify(this.producer)).initTransactions();
        verifyShutdown(true, false);
    }

    @Test
    public void testFailureInPostProducerCheck() throws Exception {
        createWorkerTask();
        ((Runnable) Mockito.doThrow(new Throwable[]{new ConnectException("New task configs for the connector have already been generated")}).when(this.postProducerCheck)).run();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.run();
        ((Runnable) Mockito.verify(this.preProducerCheck)).run();
        ((Producer) Mockito.verify(this.producer)).initTransactions();
        ((Runnable) Mockito.verify(this.postProducerCheck)).run();
        verifyShutdown(true, false);
    }

    @Test
    public void testFailureInOffsetStoreStart() throws Exception {
        createWorkerTask();
        ((ConnectorOffsetBackingStore) Mockito.doThrow(new Throwable[]{new ConnectException("No soup for you!")}).when(this.offsetStore)).start();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.run();
        ((Runnable) Mockito.verify(this.preProducerCheck)).run();
        ((Producer) Mockito.verify(this.producer)).initTransactions();
        ((Runnable) Mockito.verify(this.postProducerCheck)).run();
        ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).start();
        verifyShutdown(true, false);
    }

    @Test
    public void testPollsInBackground() throws Exception {
        createWorkerTask();
        expectSuccessfulSends();
        expectSuccessfulFlushes();
        startTaskThread();
        awaitPolls(10);
        awaitShutdown();
        verifyTransactions(pollCount() + 1, pollCount() + 1);
        verifySends();
        verifyPossibleTopicCreation();
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
        assertPollMetrics(10);
        assertTransactionMetrics(RECORDS.size());
    }

    @Test
    public void testFailureInPoll() throws Exception {
        createWorkerTask();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.sourceTask.poll()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            throw runtimeException;
        });
        startTaskThread();
        ConcurrencyUtils.awaitLatch(countDownLatch, "task was not polled in time");
        awaitShutdown(false);
        verifyPreflight();
        verifyStartup();
        verifyShutdown(true, false);
        assertPollMetrics(0);
    }

    @Test
    public void testFailureInPollAfterCancel() throws Exception {
        createWorkerTask();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.sourceTask.poll()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            ConcurrencyUtils.awaitLatch(countDownLatch2, "task was not cancelled in time");
            throw runtimeException;
        });
        startTaskThread();
        ConcurrencyUtils.awaitLatch(countDownLatch, "task was not polled in time");
        this.workerTask.cancel();
        countDownLatch2.countDown();
        awaitShutdown(false);
        verifyPreflight();
        verifyStartup();
        verifyShutdown(false, true);
        assertPollMetrics(0);
    }

    @Test
    public void testFailureInPollAfterStop() throws Exception {
        createWorkerTask();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.sourceTask.poll()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            ConcurrencyUtils.awaitLatch(countDownLatch2, "task was not stopped in time");
            throw runtimeException;
        });
        startTaskThread();
        ConcurrencyUtils.awaitLatch(countDownLatch, "task was not polled in time");
        this.workerTask.stop();
        countDownLatch2.countDown();
        awaitShutdown(false);
        verifyTransactions(0, 0);
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
        assertPollMetrics(0);
    }

    @Test
    public void testPollReturnsNoRecords() throws Exception {
        createWorkerTask();
        this.pollRecords.set(Collections.emptyList());
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush())).thenReturn(false);
        startTaskThread();
        awaitEmptyPolls(10);
        awaitShutdown();
        verifyTransactions(pollCount() + 1, 0);
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
        assertPollMetrics(0);
    }

    @Test
    public void testPollBasedCommit() throws Exception {
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(SourceTask.TransactionBoundary.POLL), this.enableTopicCreation);
        createWorkerTask();
        expectSuccessfulSends();
        expectSuccessfulFlushes();
        startTaskThread();
        awaitPolls(10);
        awaitShutdown();
        verifyTransactions(pollCount() + 1, pollCount() + 1);
        verifySends();
        verifyPossibleTopicCreation();
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
        assertPollMetrics(1);
        assertTransactionMetrics(RECORDS.size());
    }

    @Test
    public void testIntervalBasedCommit() throws Exception {
        Map<String, String> sourceConnectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.INTERVAL);
        sourceConnectorProps.put("transaction.boundary.interval.ms", Long.toString(618L));
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps, this.enableTopicCreation);
        this.time = new MockTime();
        createWorkerTask();
        expectSuccessfulSends();
        expectSuccessfulFlushes();
        startTaskThread();
        awaitPolls(2);
        Assert.assertEquals("No flushes should have taken place before offset commit interval has elapsed", 0L, flushCount());
        this.time.sleep(618L);
        awaitPolls(2);
        Assert.assertEquals("One flush should have taken place after offset commit interval has elapsed", 1L, flushCount());
        this.time.sleep(618 * 2);
        awaitPolls(2);
        Assert.assertEquals("Two flushes should have taken place after offset commit interval has elapsed again", 2L, flushCount());
        awaitShutdown();
        verifyTransactions(3, 3);
        verifySends();
        verifyPossibleTopicCreation();
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
        assertPollMetrics(2);
        assertTransactionMetrics(RECORDS.size() * 2);
    }

    @Test
    public void testConnectorCommitOnBatch() throws Exception {
        testConnectorBasedCommit((v0) -> {
            v0.commitTransaction();
        }, false);
    }

    @Test
    public void testConnectorCommitOnRecord() throws Exception {
        testConnectorBasedCommit(transactionContext -> {
            transactionContext.commitTransaction(SOURCE_RECORD_2);
        }, false);
    }

    @Test
    public void testConnectorAbortOnBatch() throws Exception {
        testConnectorBasedCommit((v0) -> {
            v0.abortTransaction();
        }, true);
    }

    @Test
    public void testConnectorAbortOnRecord() throws Exception {
        testConnectorBasedCommit(transactionContext -> {
            transactionContext.abortTransaction(SOURCE_RECORD_2);
        }, true);
    }

    private void testConnectorBasedCommit(Consumer<TransactionContext> consumer, boolean z) throws Exception {
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR), this.enableTopicCreation);
        createWorkerTask();
        expectSuccessfulSends();
        expectSuccessfulFlushes();
        WorkerTransactionContext transactionContext = this.workerTask.sourceTaskContext.transactionContext();
        startTaskThread();
        awaitPolls(3);
        Assert.assertEquals("No flushes should have taken place without connector requesting transaction commit", 0L, flushCount());
        consumer.accept(transactionContext);
        awaitPolls(3);
        Assert.assertEquals("One flush should have taken place after transaction commit/abort was requested", 1L, flushCount());
        awaitPolls(3);
        Assert.assertEquals("Only one flush should still have taken place without connector re-requesting commit/abort, even on identical records", 1L, flushCount());
        awaitShutdown();
        Assert.assertEquals("Task should have flushed offsets once based on connector-defined boundaries, and skipped final end-of-life offset commit", 1L, flushCount());
        ((Producer) Mockito.verify(this.producer, Mockito.times(z ? 3 : 2))).beginTransaction();
        verifySends();
        if (z) {
            ((Producer) Mockito.verify(this.producer)).abortTransaction();
        }
        ((Producer) Mockito.verify(this.producer)).commitTransaction();
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
        verifyPossibleTopicCreation();
        assertPollMetrics(1);
        assertTransactionMetrics(z ? 0 : 3 * RECORDS.size());
    }

    @Test
    public void testConnectorAbortsEmptyTransaction() throws Exception {
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR), this.enableTopicCreation);
        createWorkerTask();
        expectPossibleTopicCreation();
        expectTaskGetTopic();
        expectApplyTransformationChain();
        expectConvertHeadersAndKeyValue();
        WorkerTransactionContext transactionContext = this.workerTask.sourceTaskContext.transactionContext();
        startTaskThread();
        awaitPolls(1);
        awaitEmptyPolls(1);
        transactionContext.commitTransaction();
        awaitEmptyPolls(1);
        transactionContext.abortTransaction();
        awaitEmptyPolls(1);
        awaitShutdown(true);
        ((Producer) Mockito.verify(this.producer, Mockito.times(1))).beginTransaction();
        ((Producer) Mockito.verify(this.producer, Mockito.times(1))).commitTransaction();
        ((Producer) Mockito.verify(this.producer, Mockito.atLeast(RECORDS.size()))).send((ProducerRecord) ArgumentMatchers.any(), (org.apache.kafka.clients.producer.Callback) ArgumentMatchers.any());
        ((Producer) Mockito.verify(this.producer, Mockito.never())).abortTransaction();
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
        verifyPossibleTopicCreation();
    }

    @Test
    public void testMixedConnectorTransactionBoundaryCommitLastRecordAbortBatch() throws Exception {
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR), this.enableTopicCreation);
        createWorkerTask();
        expectPossibleTopicCreation();
        expectTaskGetTopic();
        expectApplyTransformationChain();
        expectConvertHeadersAndKeyValue();
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush())).thenReturn(true).thenReturn(false);
        this.workerTask.initialize(TASK_CONFIG);
        WorkerTransactionContext transactionContext = this.workerTask.sourceTaskContext.transactionContext();
        transactionContext.abortTransaction();
        transactionContext.commitTransaction(RECORDS.get(RECORDS.size() - 1));
        this.workerTask.toSend = RECORDS;
        Assert.assertTrue(this.workerTask.sendRecords());
        verifyTransactions(2, 1);
        verifySends(RECORDS.size());
        ((Producer) Mockito.verify(this.producer, Mockito.never())).abortTransaction();
        verifyPossibleTopicCreation();
    }

    @Test
    public void testMixedConnectorTransactionBoundaryAbortLastRecordCommitBatch() throws Exception {
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR), this.enableTopicCreation);
        createWorkerTask();
        expectPossibleTopicCreation();
        expectTaskGetTopic();
        expectApplyTransformationChain();
        expectConvertHeadersAndKeyValue();
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush())).thenReturn(true).thenReturn(false);
        this.workerTask.initialize(TASK_CONFIG);
        WorkerTransactionContext transactionContext = this.workerTask.sourceTaskContext.transactionContext();
        transactionContext.abortTransaction(RECORDS.get(RECORDS.size() - 1));
        transactionContext.commitTransaction();
        this.workerTask.toSend = RECORDS;
        Assert.assertTrue(this.workerTask.sendRecords());
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, Mockito.times(2))).beginFlush();
        ((SourceTask) Mockito.verify(this.sourceTask, Mockito.times(2))).commit();
        ((Producer) Mockito.verify(this.producer, Mockito.times(2))).beginTransaction();
        ((Producer) Mockito.verify(this.producer, Mockito.times(1))).commitTransaction();
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, Mockito.times(1))).doFlush((Callback) ArgumentMatchers.any());
        ((Producer) Mockito.verify(this.producer, Mockito.times(1))).abortTransaction();
        verifySends(RECORDS.size());
        verifyPossibleTopicCreation();
    }

    @Test
    public void testCommitFlushSyncCallbackFailure() throws Exception {
        RecordTooLargeException recordTooLargeException = new RecordTooLargeException();
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush())).thenReturn(true);
        Mockito.when(this.offsetWriter.doFlush((Callback) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(0)).onCompletion(recordTooLargeException, (Object) null);
            return null;
        });
        testCommitFailure(recordTooLargeException, false);
    }

    @Test
    public void testCommitFlushAsyncCallbackFailure() throws Exception {
        RecordTooLargeException recordTooLargeException = new RecordTooLargeException();
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush())).thenReturn(true);
        AtomicReference atomicReference = new AtomicReference();
        Mockito.when(this.offsetWriter.doFlush((Callback) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            atomicReference.set((Callback) invocationOnMock.getArgument(0));
            return null;
        });
        ((Producer) Mockito.doAnswer(invocationOnMock2 -> {
            ((Callback) atomicReference.get()).onCompletion(recordTooLargeException, (Object) null);
            return null;
        }).when(this.producer)).commitTransaction();
        testCommitFailure(recordTooLargeException, true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testCommitTransactionFailure() throws Exception {
        RecordTooLargeException recordTooLargeException = new RecordTooLargeException();
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush())).thenReturn(true);
        ((Producer) Mockito.doThrow(new Throwable[]{recordTooLargeException}).when(this.producer)).commitTransaction();
        testCommitFailure(recordTooLargeException, true);
    }

    private void testCommitFailure(Exception exc, boolean z) throws Exception {
        createWorkerTask();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        ((TaskStatus.Listener) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            atomicReference.set((Exception) invocationOnMock.getArgument(1));
            return null;
        }).when(this.statusListener)).onFailure((ConnectorTaskId) ArgumentMatchers.eq(this.taskId), (Throwable) ArgumentMatchers.any());
        expectSuccessfulSends();
        startTaskThread();
        ConcurrencyUtils.awaitLatch(countDownLatch, "task did not fail in time");
        awaitShutdown();
        Assert.assertEquals(exc, ((Exception) atomicReference.get()).getCause());
        verifySends();
        verifyPossibleTopicCreation();
        ((Producer) Mockito.verify(this.producer)).beginTransaction();
        if (z) {
            ((Producer) Mockito.verify(this.producer)).commitTransaction();
        }
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter)).cancelFlush();
        verifyPreflight();
        verifyStartup();
        verifyShutdown(true, false);
    }

    @Test
    public void testSendRecordsRetries() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, offset(1), "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, offset(2), "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, offset(3), "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
        expectPossibleTopicCreation();
        expectTaskGetTopic();
        expectApplyTransformationChain();
        expectConvertHeadersAndKeyValue();
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3);
        expectSuccessfulSend(expectSynchronousFailedSend(expectSuccessfulSend(Mockito.when(this.producer.send((ProducerRecord) ArgumentMatchers.any(), (org.apache.kafka.clients.producer.Callback) ArgumentMatchers.any()))), new TimeoutException("retriable sync failure")));
        Assert.assertFalse(this.workerTask.sendRecords());
        Assert.assertEquals(Arrays.asList(sourceRecord2, sourceRecord3), this.workerTask.toSend);
        ((Producer) Mockito.verify(this.producer)).beginTransaction();
        ((Producer) Mockito.verify(this.producer, Mockito.never())).commitTransaction();
        verifySends(2);
        verifyPossibleTopicCreation();
        Assert.assertTrue(this.workerTask.sendRecords());
        Assert.assertNull(this.workerTask.toSend);
        ((Producer) Mockito.verify(this.producer, Mockito.times(1))).commitTransaction();
        verifySends(4);
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter)).offset(PARTITION, offset(1));
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter)).offset(PARTITION, offset(2));
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter)).offset(PARTITION, offset(3));
    }

    @Test
    public void testSendRecordsProducerSendFailsImmediately() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
        expectPossibleTopicCreation();
        expectConvertHeadersAndKeyValue();
        expectApplyTransformationChain();
        Mockito.when(this.producer.send((ProducerRecord) ArgumentMatchers.any(), (org.apache.kafka.clients.producer.Callback) ArgumentMatchers.any())).thenThrow(new Throwable[]{new KafkaException("Producer closed while send in progress", new InvalidTopicException("topic"))});
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        ExactlyOnceWorkerSourceTask exactlyOnceWorkerSourceTask = this.workerTask;
        Objects.requireNonNull(exactlyOnceWorkerSourceTask);
        Assert.assertThrows(ConnectException.class, exactlyOnceWorkerSourceTask::sendRecords);
        ((Producer) Mockito.verify(this.producer)).beginTransaction();
        ((Producer) Mockito.verify(this.producer)).send((ProducerRecord) ArgumentMatchers.any(), (org.apache.kafka.clients.producer.Callback) ArgumentMatchers.any());
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, Mockito.never())).offset((Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
        verifyPossibleTopicCreation();
    }

    @Test
    public void testSlowTaskStart() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createWorkerTask();
        ((SourceTask) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            ConcurrencyUtils.awaitLatch(countDownLatch2, "task was not allowed to finish startup in time");
            return null;
        }).when(this.sourceTask)).start((Map) ArgumentMatchers.eq(TASK_PROPS));
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush())).thenReturn(false);
        startTaskThread();
        ConcurrencyUtils.awaitLatch(countDownLatch, "task did not start up in time");
        this.workerTask.stop();
        countDownLatch2.countDown();
        awaitShutdown(false);
        ((SourceTask) Mockito.verify(this.sourceTask, Mockito.never())).poll();
        verifyTransactions(1, 0);
        verifySends(0);
        verifyPreflight();
        verifyStartup();
        verifyCleanShutdown();
    }

    @Test
    public void testCancel() {
        createWorkerTask();
        this.workerTask.cancel();
        ((CloseableOffsetStorageReader) Mockito.verify(this.offsetReader)).close();
        ((Producer) Mockito.verify(this.producer)).close(Duration.ZERO);
    }

    private int pollCount() {
        return (int) MockitoUtils.countInvocations(this.sourceTask, "poll", new Class[0]);
    }

    private int flushCount() {
        return (int) MockitoUtils.countInvocations(this.offsetWriter, "doFlush", Callback.class);
    }

    private void awaitPolls(int i, List<SourceRecord> list) {
        this.pollRecords.set(list);
        this.pollLatch.set(new CountDownLatch(i));
        ConcurrencyUtils.awaitLatch(this.pollLatch.get(), "task was not polled " + i + " time(s) quickly enough");
    }

    private void awaitEmptyPolls(int i) throws InterruptedException {
        awaitPolls(i, Collections.emptyList());
    }

    private void awaitPolls(int i) throws InterruptedException {
        awaitPolls(i, RECORDS);
    }

    private void awaitShutdown() throws InterruptedException, ExecutionException {
        awaitShutdown(true);
    }

    private void awaitShutdown(boolean z) throws InterruptedException, ExecutionException {
        if (z) {
            this.workerTask.stop();
        }
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        this.workerTaskFuture.get();
    }

    private void startTaskThread() {
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTaskFuture = this.executor.submit((Runnable) this.workerTask);
    }

    private void expectSuccessfulFlushes() throws InterruptedException, java.util.concurrent.TimeoutException {
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush())).thenReturn(true);
        Mockito.when(this.offsetWriter.doFlush((Callback) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(0)).onCompletion((Throwable) null, (Object) null);
            return null;
        });
    }

    private void expectSuccessfulSends() {
        expectConvertHeadersAndKeyValue();
        expectApplyTransformationChain();
        expectSuccessfulSend(Mockito.when(this.producer.send((ProducerRecord) ArgumentMatchers.any(), (org.apache.kafka.clients.producer.Callback) ArgumentMatchers.any())));
        expectTaskGetTopic();
        expectPossibleTopicCreation();
    }

    private OngoingStubbing<Future<RecordMetadata>> expectSuccessfulSend(OngoingStubbing<Future<RecordMetadata>> ongoingStubbing) {
        return ongoingStubbing.thenAnswer(invocationOnMock -> {
            ((org.apache.kafka.clients.producer.Callback) invocationOnMock.getArgument(1)).onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0, 0L, 0, 0), (Exception) null);
            return null;
        });
    }

    private OngoingStubbing<Future<RecordMetadata>> expectSynchronousFailedSend(OngoingStubbing<Future<RecordMetadata>> ongoingStubbing, Exception exc) {
        return ongoingStubbing.thenThrow(new Throwable[]{exc});
    }

    private void expectConvertHeadersAndKeyValue() {
        RecordHeaders<Header> recordHeaders = new RecordHeaders();
        for (Header header : recordHeaders) {
            Mockito.when(this.headerConverter.fromConnectHeader((String) ArgumentMatchers.eq("topic"), (String) ArgumentMatchers.eq(header.key()), (Schema) ArgumentMatchers.eq(Schema.STRING_SCHEMA), ArgumentMatchers.eq(new String(header.value())))).thenReturn(header.value());
        }
        Mockito.when(this.keyConverter.fromConnectData((String) ArgumentMatchers.eq("topic"), (Headers) ArgumentMatchers.eq(recordHeaders), (Schema) ArgumentMatchers.eq(KEY_SCHEMA), ArgumentMatchers.eq(KEY))).thenReturn(SERIALIZED_KEY);
        Mockito.when(this.valueConverter.fromConnectData((String) ArgumentMatchers.eq("topic"), (Headers) ArgumentMatchers.eq(recordHeaders), (Schema) ArgumentMatchers.eq(RECORD_SCHEMA), ArgumentMatchers.eq(VALUE_1))).thenReturn(SERIALIZED_RECORD);
    }

    private void expectApplyTransformationChain() {
        Mockito.when(this.transformationChain.apply((SourceRecord) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return invocationOnMock.getArgument(0);
        });
    }

    private void expectTaskGetTopic() {
        Mockito.when(this.statusBackingStore.getTopic((String) ArgumentMatchers.any(), (String) ArgumentMatchers.eq("topic"))).thenAnswer(invocationOnMock -> {
            return new TopicStatus((String) invocationOnMock.getArgument(1), new ConnectorTaskId((String) invocationOnMock.getArgument(0), 0), this.time.milliseconds());
        });
    }

    private void expectPossibleTopicCreation() {
        if (this.config.topicCreationEnable()) {
            Mockito.when(this.admin.createOrFindTopics((NewTopic[]) ArgumentMatchers.any())).thenReturn(new TopicAdmin.TopicCreationResponse(Collections.singleton("topic"), Collections.emptySet()));
        }
    }

    private void verifyPreflight() {
        ((Runnable) Mockito.verify(this.preProducerCheck)).run();
        ((Producer) Mockito.verify(this.producer)).initTransactions();
        ((Runnable) Mockito.verify(this.postProducerCheck)).run();
        ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).start();
    }

    private void verifyStartup() {
        ((SourceTask) Mockito.verify(this.sourceTask)).initialize((SourceTaskContext) ArgumentMatchers.any());
        ((SourceTask) Mockito.verify(this.sourceTask)).start(TASK_PROPS);
        this.taskStarted = true;
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onStartup(this.taskId);
    }

    private void verifyCleanShutdown() throws Exception {
        verifyShutdown(false, false);
    }

    private void verifyShutdown(boolean z, boolean z2) throws Exception {
        if (z2) {
            ((Producer) Mockito.verify(this.producer)).close(Duration.ZERO);
            ((Producer) Mockito.verify(this.producer, Mockito.times(2))).close((Duration) ArgumentMatchers.any());
            ((CloseableOffsetStorageReader) Mockito.verify(this.offsetReader, Mockito.times(2))).close();
        } else {
            ((Producer) Mockito.verify(this.producer)).close((Duration) ArgumentMatchers.any());
            ((CloseableOffsetStorageReader) Mockito.verify(this.offsetReader)).close();
        }
        ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).stop();
        ((TopicAdmin) Mockito.verify(this.admin)).close((Duration) ArgumentMatchers.any());
        ((TransformationChain) Mockito.verify(this.transformationChain)).close();
        ((HeaderConverter) Mockito.verify(this.headerConverter)).close();
        if (this.taskStarted) {
            ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        }
        if (z) {
            ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onFailure((ConnectorTaskId) ArgumentMatchers.eq(this.taskId), (Throwable) ArgumentMatchers.any());
        } else {
            if (z2) {
                return;
            }
            ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        }
    }

    private void verifyPossibleTopicCreation() {
        if (this.enableTopicCreation) {
            ((TopicAdmin) Mockito.verify(this.admin)).createOrFindTopics((NewTopic[]) ArgumentMatchers.any());
        } else {
            ((TopicAdmin) Mockito.verify(this.admin, Mockito.never())).createOrFindTopics((NewTopic[]) ArgumentMatchers.any());
        }
    }

    private void verifySends() {
        verifySends(pollCount() * RECORDS.size());
    }

    private void verifySends(int i) {
        ((Producer) Mockito.verify(this.producer, Mockito.times(i))).send((ProducerRecord) ArgumentMatchers.any(), (org.apache.kafka.clients.producer.Callback) ArgumentMatchers.any());
    }

    private void verifyTransactions(int i, int i2) throws InterruptedException {
        VerificationMode times = Mockito.times(i);
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, times)).beginFlush();
        ((SourceTask) Mockito.verify(this.sourceTask, times)).commit();
        VerificationMode times2 = Mockito.times(i2);
        ((Producer) Mockito.verify(this.producer, times2)).beginTransaction();
        ((Producer) Mockito.verify(this.producer, times2)).commitTransaction();
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, times2)).doFlush((Callback) ArgumentMatchers.any());
    }

    private void assertTransactionMetrics(int i) {
        ConnectMetrics.MetricGroup metricGroup = this.workerTask.transactionMetricsGroup().metricGroup();
        double currentMetricValueAsDouble = this.metrics.currentMetricValueAsDouble(metricGroup, "transaction-size-min");
        double currentMetricValueAsDouble2 = this.metrics.currentMetricValueAsDouble(metricGroup, "transaction-size-max");
        double currentMetricValueAsDouble3 = this.metrics.currentMetricValueAsDouble(metricGroup, "transaction-size-avg");
        Assert.assertTrue(currentMetricValueAsDouble >= 0.0d);
        Assert.assertTrue(currentMetricValueAsDouble2 >= ((double) i));
        if (currentMetricValueAsDouble2 - currentMetricValueAsDouble <= 1.0E-6d) {
            Assert.assertEquals(currentMetricValueAsDouble2, currentMetricValueAsDouble3, 2.0E-6d);
        } else {
            Assert.assertTrue("Average transaction size should be greater than minimum transaction size", currentMetricValueAsDouble3 > currentMetricValueAsDouble);
            Assert.assertTrue("Average transaction size should be less than maximum transaction size", currentMetricValueAsDouble3 < currentMetricValueAsDouble2);
        }
    }

    private void assertPollMetrics(int i) {
        ConnectMetrics.MetricGroup metricGroup = this.workerTask.sourceTaskMetricsGroup().metricGroup();
        ConnectMetrics.MetricGroup metricGroup2 = this.workerTask.taskMetricsGroup().metricGroup();
        double currentMetricValueAsDouble = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-poll-rate");
        double currentMetricValueAsDouble2 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-poll-total");
        if (i > 0) {
            Assert.assertEquals(RECORDS.size(), this.metrics.currentMetricValueAsDouble(metricGroup2, "batch-size-max"), 1.0E-6d);
            Assert.assertEquals(RECORDS.size(), this.metrics.currentMetricValueAsDouble(metricGroup2, "batch-size-avg"), 1.0E-6d);
            Assert.assertTrue(currentMetricValueAsDouble > 0.0d);
        } else {
            Assert.assertEquals(0.0d, currentMetricValueAsDouble, 0.0d);
        }
        Assert.assertTrue(currentMetricValueAsDouble2 >= ((double) i));
        double currentMetricValueAsDouble3 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-write-rate");
        double currentMetricValueAsDouble4 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-write-total");
        if (i > 0) {
            Assert.assertTrue(currentMetricValueAsDouble3 > 0.0d);
        } else {
            Assert.assertEquals(0.0d, currentMetricValueAsDouble3, 0.0d);
        }
        Assert.assertTrue(currentMetricValueAsDouble4 >= ((double) i));
        double currentMetricValueAsDouble5 = this.metrics.currentMetricValueAsDouble(metricGroup, "poll-batch-max-time-ms");
        double currentMetricValueAsDouble6 = this.metrics.currentMetricValueAsDouble(metricGroup, "poll-batch-avg-time-ms");
        if (i > 0) {
            Assert.assertTrue(currentMetricValueAsDouble5 >= 0.0d);
        }
        Assert.assertTrue(Double.isNaN(currentMetricValueAsDouble6) || currentMetricValueAsDouble6 > 0.0d);
        double currentMetricValueAsDouble7 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-active-count");
        double currentMetricValueAsDouble8 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-active-count-max");
        Assert.assertEquals(0.0d, currentMetricValueAsDouble7, 1.0E-6d);
        if (i > 0) {
            Assert.assertEquals(RECORDS.size(), currentMetricValueAsDouble8, 1.0E-6d);
        }
    }

    static {
        TASK_PROPS.put("task.class", TestSourceTask.class.getName());
        TASK_CONFIG = new TaskConfig(TASK_PROPS);
        SOURCE_RECORD_1 = new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
        SOURCE_RECORD_2 = new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_2);
        RECORDS = Arrays.asList(SOURCE_RECORD_1, SOURCE_RECORD_2);
    }
}
