package kafka.utils;

import java.io.File;
import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.log.Log;
import kafka.log.Log$;
import kafka.log.LogConfig;
import kafka.log.LogConfig$;
import kafka.log.LogManager$;
import kafka.log.ProducerStateManager;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.collection.LinearSeqOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: SchedulerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005)4A\u0001F\u000b\u00015!)\u0011\u0005\u0001C\u0001E!9Q\u0005\u0001b\u0001\n\u00031\u0003B\u0002\u0016\u0001A\u0003%q\u0005C\u0004,\u0001\t\u0007I\u0011\u0001\u0017\t\rA\u0002\u0001\u0015!\u0003.\u0011\u001d\t\u0004A1A\u0005\u0002IBaa\u0010\u0001!\u0002\u0013\u0019\u0004b\u0002!\u0001\u0005\u0004%\tA\r\u0005\u0007\u0003\u0002\u0001\u000b\u0011B\u001a\t\u000b\t\u0003A\u0011A\"\t\u000bA\u0003A\u0011A\"\t\u000bU\u0003A\u0011A\"\t\u000bi\u0003A\u0011A\"\t\u000bq\u0003A\u0011A\"\t\u000by\u0003A\u0011A\"\t\u000b\u0001\u0004A\u0011A\"\t\u000b\t\u0004A\u0011A\"\t\u000b\u0011\u0004A\u0011A\"\t\u000b\u0019\u0004A\u0011A\"\u0003\u001bM\u001b\u0007.\u001a3vY\u0016\u0014H+Z:u\u0015\t1r#A\u0003vi&d7OC\u0001\u0019\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u000e\u0011\u0005qyR\"A\u000f\u000b\u0003y\tQa]2bY\u0006L!\u0001I\u000f\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\t1\u0005\u0005\u0002%\u00015\tQ#A\u0005tG\",G-\u001e7feV\tq\u0005\u0005\u0002%Q%\u0011\u0011&\u0006\u0002\u000f\u0017\u000647.Y*dQ\u0016$W\u000f\\3s\u0003)\u00198\r[3ek2,'\u000fI\u0001\t[>\u001c7\u000eV5nKV\tQ\u0006\u0005\u0002%]%\u0011q&\u0006\u0002\t\u001b>\u001c7\u000eV5nK\u0006IQn\\2l)&lW\rI\u0001\tG>,h\u000e^3scU\t1\u0007\u0005\u00025{5\tQG\u0003\u00027o\u00051\u0011\r^8nS\u000eT!\u0001O\u001d\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002;w\u0005!Q\u000f^5m\u0015\u0005a\u0014\u0001\u00026bm\u0006L!AP\u001b\u0003\u001b\u0005#x.\\5d\u0013:$XmZ3s\u0003%\u0019w.\u001e8uKJ\f\u0004%\u0001\u0005d_VtG/\u001a:3\u0003%\u0019w.\u001e8uKJ\u0014\u0004%A\u0003tKR,\b\u000fF\u0001E!\taR)\u0003\u0002G;\t!QK\\5uQ\tQ\u0001\n\u0005\u0002J\u001d6\t!J\u0003\u0002L\u0019\u0006)!.\u001e8ji*\tQ*A\u0002pe\u001eL!a\u0014&\u0003\r\t+gm\u001c:f\u0003!!X-\u0019:e_^t\u0007FA\u0006S!\tI5+\u0003\u0002U\u0015\n)\u0011I\u001a;fe\u0006\u0001C/Z:u\u001b>\u001c7nU2iK\u0012,H.\u001a:O_:\u0004VM]5pI&\u001cG+Y:lQ\taq\u000b\u0005\u0002J1&\u0011\u0011L\u0013\u0002\u0005)\u0016\u001cH/A\u000fuKN$Xj\\2l'\u000eDW\rZ;mKJ\u0004VM]5pI&\u001cG+Y:lQ\tiq+\u0001\u0011uKN$(+Z3oiJ\fg\u000e\u001e+bg.Le.T8dWN\u001b\u0007.\u001a3vY\u0016\u0014\bF\u0001\bX\u0003M!Xm\u001d;O_:\u0004VM]5pI&\u001cG+Y:lQ\tyq+\u0001\tuKN$\b+\u001a:j_\u0012L7\rV1tW\"\u0012\u0001cV\u0001\fi\u0016\u001cHOU3ti\u0006\u0014H\u000f\u000b\u0002\u0012/\u0006QB/Z:u+:\u001c8\r[3ek2,\u0007K]8ek\u000e,'\u000fV1tW\"\u0012!cV\u0001\u0019i\u0016\u001cH/T8dWN\u001b\u0007.\u001a3vY\u0016\u0014Hj\\2lS:<\u0007\u0006B\nXQ&\fq\u0001^5nK>,HO\b\u0002;1\u0003")
/* loaded from: input_file:kafka/utils/SchedulerTest.class */
public class SchedulerTest {
    private final KafkaScheduler scheduler;
    private final MockTime mockTime;
    private final AtomicInteger counter1;
    private final AtomicInteger counter2;

    public KafkaScheduler scheduler() {
        return this.scheduler;
    }

    public MockTime mockTime() {
        return this.mockTime;
    }

    public AtomicInteger counter1() {
        return this.counter1;
    }

    public AtomicInteger counter2() {
        return this.counter2;
    }

    @Before
    public void setup() {
        scheduler().startup();
    }

    @After
    public void teardown() {
        scheduler().shutdown();
    }

    @Test
    public void testMockSchedulerNonPeriodicTask() {
        mockTime().scheduler().schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        mockTime().scheduler().schedule("test2", () -> {
            this.counter2().getAndIncrement();
        }, 100L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        Assert.assertEquals("Counter1 should not be incremented prior to task running.", 0L, counter1().get());
        Assert.assertEquals("Counter2 should not be incremented prior to task running.", 0L, counter2().get());
        mockTime().sleep(1L);
        Assert.assertEquals("Counter1 should be incremented", 1L, counter1().get());
        Assert.assertEquals("Counter2 should not be incremented", 0L, counter2().get());
        mockTime().sleep(100000L);
        Assert.assertEquals("More sleeping should not result in more incrementing on counter1.", 1L, counter1().get());
        Assert.assertEquals("Counter2 should now be incremented.", 1L, counter2().get());
    }

    @Test
    public void testMockSchedulerPeriodicTask() {
        mockTime().scheduler().schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, 1L, mockTime().scheduler().schedule$default$5());
        mockTime().scheduler().schedule("test2", () -> {
            this.counter2().getAndIncrement();
        }, 100L, 100L, mockTime().scheduler().schedule$default$5());
        Assert.assertEquals("Counter1 should not be incremented prior to task running.", 0L, counter1().get());
        Assert.assertEquals("Counter2 should not be incremented prior to task running.", 0L, counter2().get());
        mockTime().sleep(1L);
        Assert.assertEquals("Counter1 should be incremented", 1L, counter1().get());
        Assert.assertEquals("Counter2 should not be incremented", 0L, counter2().get());
        mockTime().sleep(100L);
        Assert.assertEquals("Counter1 should be incremented 101 times", 101L, counter1().get());
        Assert.assertEquals("Counter2 should not be incremented once", 1L, counter2().get());
    }

    @Test
    public void testReentrantTaskInMockScheduler() {
        mockTime().scheduler().schedule("test1", () -> {
            this.mockTime().scheduler().schedule("test2", () -> {
                this.counter2().getAndIncrement();
            }, 0L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        mockTime().sleep(1L);
        Assert.assertEquals(1L, counter2().get());
    }

    @Test
    public void testNonPeriodicTask() {
        scheduler().schedule("test", () -> {
            this.counter1().getAndIncrement();
        }, 0L, scheduler().schedule$default$4(), scheduler().schedule$default$5());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testNonPeriodicTask$2(this);
                Thread.sleep(5L);
                Assert.assertEquals("Should only run once", 1L, counter1().get());
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                package$ package_ = package$.MODULE$;
                j += Math.min(j, 1000L);
            }
        }
    }

    @Test
    public void testPeriodicTask() {
        scheduler().schedule("test", () -> {
            this.counter1().getAndIncrement();
        }, 0L, 5L, scheduler().schedule$default$5());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testPeriodicTask$2(this);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                package$ package_ = package$.MODULE$;
                j += Math.min(j, 1000L);
            }
        }
    }

    @Test
    public void testRestart() {
        mockTime().scheduler().schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        mockTime().sleep(1L);
        Assert.assertEquals(1L, counter1().get());
        mockTime().scheduler().shutdown();
        mockTime().scheduler().startup();
        mockTime().scheduler().schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        mockTime().sleep(1L);
        Assert.assertEquals(2L, counter1().get());
    }

    @Test
    public void testUnscheduleProducerTask() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        File randomPartitionLogDir = TestUtils$.MODULE$.randomPartitionLogDir(org.apache.kafka.test.TestUtils.tempDirectory((Path) null, (String) null));
        LogConfig logConfig = new LogConfig(new Properties(), LogConfig$.MODULE$.apply$default$2());
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        TopicPartition parseTopicPartitionName = Log$.MODULE$.parseTopicPartitionName(randomPartitionLogDir);
        Log log = new Log(randomPartitionLogDir, logConfig, 0L, 0L, scheduler(), brokerTopicStats, mockTime(), 3600000, LogManager$.MODULE$.ProducerIdExpirationCheckIntervalMs(), parseTopicPartitionName, new ProducerStateManager(parseTopicPartitionName, randomPartitionLogDir, 3600000), new LogDirFailureChannel(10));
        Assert.assertTrue(scheduler().taskRunning(log.producerExpireCheck()));
        log.close();
        Assert.assertTrue(!scheduler().taskRunning(log.producerExpireCheck()));
    }

    @Test(timeout = 15000)
    public void testMockSchedulerLocking() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        List colonVar = new $colon.colon(new CountDownLatch(1), new $colon.colon(new CountDownLatch(1), Nil$.MODULE$));
        mockTime().scheduler().schedule("test1", () -> {
            scheduledTask$1((CountDownLatch) colonVar.head(), countDownLatch, countDownLatch2);
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            newSingleThreadScheduledExecutor.scheduleWithFixedDelay(() -> {
                this.mockTime().sleep(1L);
            }, 0L, 1L, TimeUnit.MILLISECONDS);
            Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            mockTime().scheduler().schedule("test2", () -> {
                scheduledTask$1((CountDownLatch) LinearSeqOps.apply$(colonVar, 1), countDownLatch, countDownLatch2);
            }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
            for (List list = colonVar; !list.isEmpty(); list = (List) list.tail()) {
                ((CountDownLatch) list.head()).countDown();
            }
            Assert.assertTrue("Tasks did not complete", countDownLatch2.await(10L, TimeUnit.SECONDS));
        } finally {
            newSingleThreadScheduledExecutor.shutdownNow();
        }
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTask$2(SchedulerTest schedulerTest) {
        Assert.assertEquals(schedulerTest.counter1().get(), 1L);
    }

    public static final /* synthetic */ void $anonfun$testPeriodicTask$2(SchedulerTest schedulerTest) {
        Assert.assertTrue("Should count to 20", schedulerTest.counter1().get() >= 20);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void scheduledTask$1(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, CountDownLatch countDownLatch3) {
        countDownLatch2.countDown();
        Assert.assertTrue("Timed out waiting for latch", countDownLatch.await(30L, TimeUnit.SECONDS));
        countDownLatch3.countDown();
    }

    public SchedulerTest() {
        KafkaScheduler$ kafkaScheduler$ = KafkaScheduler$.MODULE$;
        KafkaScheduler$ kafkaScheduler$2 = KafkaScheduler$.MODULE$;
        this.scheduler = new KafkaScheduler(1, "kafka-scheduler-", true);
        this.mockTime = new MockTime();
        this.counter1 = new AtomicInteger(0);
        this.counter2 = new AtomicInteger(0);
    }

    public static final /* synthetic */ Object $anonfun$testMockSchedulerLocking$4$adapted(CountDownLatch countDownLatch) {
        countDownLatch.countDown();
        return BoxedUnit.UNIT;
    }
}
