package org.apache.kafka.connect.runtime.errors;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ErrorHandlingTaskTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.PluginsTest;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.OngoingStubbing;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.class */
public class RetryWithToleranceOperatorTest {
    private static final Map<String, String> PROPERTIES = new HashMap<String, String>() { // from class: org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.1
        {
            put("metrics.num.samples", Objects.toString(2));
            put("metrics.sample.window.ms", Objects.toString(3000));
            put("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
            put("key.converter", PluginsTest.TestConverter.class.getName());
            put("value.converter", PluginsTest.TestConverter.class.getName());
        }
    };
    public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator(0, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.NONE, Time.SYSTEM, new ErrorHandlingMetrics(new ConnectorTaskId("noop-connector", -1), new ConnectMetrics("noop-worker", new PluginsTest.TestableWorkerConfig(PROPERTIES), new SystemTime(), "test-cluster")));
    public static final RetryWithToleranceOperator ALL_OPERATOR = new RetryWithToleranceOperator(0, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM, new ErrorHandlingMetrics(new ConnectorTaskId("errors-all-tolerate-connector", -1), new ConnectMetrics("errors-all-tolerate-worker", new PluginsTest.TestableWorkerConfig(PROPERTIES), new SystemTime(), "test-cluster")));

    @Mock
    private Operation<String> mockOperation;

    @Mock
    private ConsumerRecord<byte[], byte[]> consumerRecord;

    @Mock
    ErrorHandlingMetrics errorHandlingMetrics;

    @Mock
    Plugins plugins;

    @Test
    public void testExecuteFailed() {
        new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM, this.errorHandlingMetrics).executeFailed(Stage.TASK_PUT, SinkTask.class, this.consumerRecord, new Throwable());
    }

    @Test
    public void testExecuteFailedNoTolerance() {
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.NONE, Time.SYSTEM, this.errorHandlingMetrics);
        Assert.assertThrows(ConnectException.class, () -> {
            retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, this.consumerRecord, new Throwable());
        });
    }

    @Test
    public void testHandleExceptionInTransformations() {
        testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
    }

    @Test
    public void testHandleExceptionInHeaderConverter() {
        testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInValueConverter() {
        testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInKeyConverter() {
        testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInTaskPut() {
        testHandleExceptionInStage(Stage.TASK_PUT, new RetriableException("Test"));
    }

    @Test
    public void testHandleExceptionInTaskPoll() {
        testHandleExceptionInStage(Stage.TASK_POLL, new RetriableException("Test"));
    }

    @Test
    public void testThrowExceptionInTaskPut() {
        Assert.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.TASK_PUT, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInTaskPoll() {
        Assert.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.TASK_POLL, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInKafkaConsume() {
        Assert.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInKafkaProduce() {
        Assert.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception());
        });
    }

    private void testHandleExceptionInStage(Stage stage, Exception exc) {
        RetryWithToleranceOperator retryWithToleranceOperator = setupExecutor();
        retryWithToleranceOperator.execute(() -> {
            throw exc;
        }, stage, RetryWithToleranceOperator.class);
        Assert.assertTrue(retryWithToleranceOperator.failed());
    }

    private RetryWithToleranceOperator setupExecutor() {
        return new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM, this.errorHandlingMetrics);
    }

    @Test
    public void testExecAndHandleRetriableErrorOnce() throws Exception {
        execAndHandleRetriableError(6000L, 1, Collections.singletonList(300L), new RetriableException("Test"), true);
    }

    @Test
    public void testExecAndHandleRetriableErrorThrice() throws Exception {
        execAndHandleRetriableError(6000L, 3, Arrays.asList(300L, 600L, 1200L), new RetriableException("Test"), true);
    }

    @Test
    public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws Exception {
        execAndHandleRetriableError(-1L, 8, Arrays.asList(300L, 600L, 1200L, 2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true);
    }

    @Test
    public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws Exception {
        execAndHandleRetriableError(6000L, 6, Arrays.asList(300L, 600L, 1200L, 2400L, 1500L), new RetriableException("Test"), false);
    }

    public void execAndHandleRetriableError(long j, int i, List<Long> list, Exception exc, boolean z) throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        CountDownLatch countDownLatch = (CountDownLatch) Mockito.mock(CountDownLatch.class);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(j, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, mockTime, this.errorHandlingMetrics, new ProcessingContext(), countDownLatch);
        OngoingStubbing when = Mockito.when((String) this.mockOperation.call());
        for (int i2 = 0; i2 < i; i2++) {
            when = when.thenThrow(new Throwable[]{exc});
        }
        if (z) {
            when.thenReturn("Success");
        }
        for (Long l : list) {
            Mockito.when(Boolean.valueOf(countDownLatch.await(l.longValue(), TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock -> {
                mockTime.sleep(l.longValue());
                return false;
            });
        }
        String str = (String) retryWithToleranceOperator.execAndHandleError(this.mockOperation, Exception.class);
        if (z) {
            Assert.assertFalse(retryWithToleranceOperator.failed());
            Assert.assertEquals("Success", str);
        } else {
            Assert.assertTrue(retryWithToleranceOperator.failed());
        }
        Mockito.verifyNoMoreInteractions(new Object[]{countDownLatch});
        ((Operation) Mockito.verify(this.mockOperation, Mockito.times(z ? i + 1 : i))).call();
    }

    @Test
    public void testExecAndHandleNonRetriableError() throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        CountDownLatch countDownLatch = (CountDownLatch) Mockito.mock(CountDownLatch.class);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, mockTime, this.errorHandlingMetrics, new ProcessingContext(), countDownLatch);
        Mockito.when((String) this.mockOperation.call()).thenThrow(new Throwable[]{new Exception("Test")});
        String str = (String) retryWithToleranceOperator.execAndHandleError(this.mockOperation, Exception.class);
        Assert.assertTrue(retryWithToleranceOperator.failed());
        Assert.assertNull(str);
        ((Operation) Mockito.verify(this.mockOperation)).call();
        ((CountDownLatch) Mockito.verify(countDownLatch, Mockito.never())).await(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any());
    }

    @Test
    public void testExitLatch() throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        CountDownLatch countDownLatch = (CountDownLatch) Mockito.mock(CountDownLatch.class);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, mockTime, this.errorHandlingMetrics, new ProcessingContext(), countDownLatch);
        Mockito.when((String) this.mockOperation.call()).thenThrow(new Throwable[]{new RetriableException("test")});
        Mockito.when(Boolean.valueOf(countDownLatch.await(300L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock -> {
            mockTime.sleep(300L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(600L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock2 -> {
            mockTime.sleep(600L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(1200L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock3 -> {
            mockTime.sleep(1200L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(2400L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock4 -> {
            mockTime.sleep(2400L);
            retryWithToleranceOperator.triggerStop();
            return false;
        });
        retryWithToleranceOperator.execAndHandleError(this.mockOperation, Exception.class);
        Assert.assertTrue(retryWithToleranceOperator.failed());
        Assert.assertEquals(4500L, mockTime.milliseconds());
        ((CountDownLatch) Mockito.verify(countDownLatch)).countDown();
        Mockito.verifyNoMoreInteractions(new Object[]{countDownLatch});
    }

    @Test
    public void testBackoffLimit() throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        CountDownLatch countDownLatch = (CountDownLatch) Mockito.mock(CountDownLatch.class);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5L, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS, ToleranceType.NONE, mockTime, this.errorHandlingMetrics, new ProcessingContext(), countDownLatch);
        Mockito.when(Boolean.valueOf(countDownLatch.await(300L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock -> {
            mockTime.sleep(300L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(600L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock2 -> {
            mockTime.sleep(600L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(1200L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock3 -> {
            mockTime.sleep(1200L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(2400L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock4 -> {
            mockTime.sleep(2400L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(500L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock5 -> {
            mockTime.sleep(500L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(0L, TimeUnit.MILLISECONDS))).thenReturn(false);
        retryWithToleranceOperator.backoff(1, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        retryWithToleranceOperator.backoff(2, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        retryWithToleranceOperator.backoff(3, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        retryWithToleranceOperator.backoff(4, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        retryWithToleranceOperator.backoff(5, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        mockTime.sleep(1L);
        retryWithToleranceOperator.backoff(6, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Mockito.verifyNoMoreInteractions(new Object[]{countDownLatch});
    }

    @Test
    public void testToleranceLimit() {
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.NONE, Time.SYSTEM, this.errorHandlingMetrics);
        retryWithToleranceOperator.markAsFailed();
        Assert.assertFalse("should not tolerate any errors", retryWithToleranceOperator.withinToleranceLimits());
        RetryWithToleranceOperator retryWithToleranceOperator2 = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM, this.errorHandlingMetrics);
        retryWithToleranceOperator2.markAsFailed();
        retryWithToleranceOperator2.markAsFailed();
        Assert.assertTrue("should tolerate all errors", retryWithToleranceOperator2.withinToleranceLimits());
        Assert.assertTrue("no tolerance is within limits if no failures", new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.NONE, Time.SYSTEM, this.errorHandlingMetrics).withinToleranceLimits());
    }

    @Test
    public void testDefaultConfigs() {
        ConnectorConfig config = config(Collections.emptyMap());
        Assert.assertEquals(config.errorRetryTimeout(), 0L);
        Assert.assertEquals(config.errorMaxDelayInMillis(), ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS);
        Assert.assertEquals(config.errorToleranceType(), ConnectorConfig.ERRORS_TOLERANCE_DEFAULT);
    }

    ConnectorConfig config(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "test");
        hashMap.put("connector.class", SinkConnector.class.getName());
        hashMap.putAll(map);
        return new ConnectorConfig(this.plugins, hashMap);
    }

    @Test
    public void testSetConfigs() {
        Assert.assertEquals(config(Collections.singletonMap("errors.retry.timeout", "100")).errorRetryTimeout(), 100L);
        Assert.assertEquals(config(Collections.singletonMap("errors.retry.delay.max.ms", "100")).errorMaxDelayInMillis(), 100L);
        Assert.assertEquals(config(Collections.singletonMap("errors.tolerance", "none")).errorToleranceType(), ToleranceType.NONE);
    }

    @Test
    public void testThreadSafety() throws Throwable {
        long j = ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS;
        int i = 10;
        final AtomicReference atomicReference = new AtomicReference(null);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM, this.errorHandlingMetrics, new ProcessingContext() { // from class: org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.2
            private final AtomicInteger count = new AtomicInteger();
            private final AtomicInteger attempt = new AtomicInteger();

            public void error(Throwable th) {
                if (this.count.getAndIncrement() > 0) {
                    atomicReference.compareAndSet(null, new AssertionError("Concurrent call to error()"));
                }
                super.error(th);
            }

            public Future<Void> report() {
                if (this.count.getAndSet(0) > 1) {
                    atomicReference.compareAndSet(null, new AssertionError("Concurrent call to error() in report()"));
                }
                return super.report();
            }

            public void currentContext(Stage stage, Class<?> cls) {
                this.attempt.set(0);
                super.currentContext(stage, cls);
            }

            public void attempt(int i2) {
                if (!this.attempt.compareAndSet(i2 - 1, i2)) {
                    atomicReference.compareAndSet(null, new AssertionError("Concurrent call to attempt(): Attempts should increase monotonically within the scope of a given currentContext()"));
                }
                super.attempt(i2);
            }
        }, new CountDownLatch(1));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        List list = (List) IntStream.range(0, 10).boxed().map(num -> {
            return newFixedThreadPool.submit(() -> {
                long currentTimeMillis = System.currentTimeMillis();
                long j2 = 0;
                while (true) {
                    long j3 = j2 + 1;
                    j2 = j3;
                    if ((j3 % 10000 == 0 && System.currentTimeMillis() > currentTimeMillis + j) || atomicReference.get() != null) {
                        return;
                    }
                    try {
                        if (num.intValue() < i / 2) {
                            retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, this.consumerRecord, new Throwable()).get();
                        } else {
                            retryWithToleranceOperator.execute(() -> {
                                return null;
                            }, Stage.TRANSFORMATION, SinkTask.class);
                        }
                    } catch (Exception e) {
                        atomicReference.compareAndSet(null, e);
                    }
                }
            });
        }).collect(Collectors.toList());
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination((long) (1.5d * ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS), TimeUnit.MILLISECONDS);
        list.forEach(future -> {
            try {
                future.get();
            } catch (Exception e) {
                atomicReference.compareAndSet(null, e);
            }
        });
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            throw th;
        }
    }
}
