/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StateDirectoryTest {
    private final MockTime time = new MockTime();
    private File stateDir;
    private final String applicationId = "applicationId";
    private StateDirectory directory;
    private File appDir;

    private void initializeStateDirectory(boolean createStateDirectory, boolean hasNamedTopology) throws IOException {
        this.stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString((int)5));
        if (!createStateDirectory) {
            this.cleanup();
        }
        this.directory = new StateDirectory(new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "applicationId");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", StateDirectoryTest.this.stateDir.getPath());
            }
        }), (Time)this.time, createStateDirectory, hasNamedTopology);
        this.appDir = new File(this.stateDir, "applicationId");
    }

    @Before
    public void before() throws IOException {
        this.initializeStateDirectory(true, false);
    }

    @After
    public void cleanup() throws IOException {
        Utils.delete((File)this.stateDir);
    }

    @Test
    public void shouldCreateBaseDirectory() {
        Assert.assertTrue((boolean)this.stateDir.exists());
        Assert.assertTrue((boolean)this.stateDir.isDirectory());
        Assert.assertTrue((boolean)this.appDir.exists());
        Assert.assertTrue((boolean)this.appDir.isDirectory());
    }

    @Test
    public void shouldHaveSecurePermissions() {
        this.assertPermissions(this.stateDir);
        this.assertPermissions(this.appDir);
    }

    private void assertPermissions(File file) {
        Path path = file.toPath();
        if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
            EnumSet<PosixFilePermission> expectedPermissions = EnumSet.of(PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ, PosixFilePermission.OWNER_WRITE, PosixFilePermission.GROUP_EXECUTE, PosixFilePermission.OWNER_READ);
            try {
                Set<PosixFilePermission> filePermissions = Files.getPosixFilePermissions(path, new LinkOption[0]);
                MatcherAssert.assertThat(expectedPermissions, (Matcher)CoreMatchers.equalTo(filePermissions));
            }
            catch (IOException e) {
                Assert.fail((String)"Should create correct files and set correct permissions");
            }
        } else {
            MatcherAssert.assertThat((Object)file.canRead(), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)file.canWrite(), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)file.canExecute(), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    @Test
    public void shouldParseUnnamedTaskId() {
        TaskId task = new TaskId(1, 0);
        MatcherAssert.assertThat((Object)TaskId.parse((String)task.toString()), (Matcher)CoreMatchers.equalTo((Object)task));
    }

    @Test
    public void shouldParseNamedTaskId() {
        TaskId task = new TaskId(1, 0, "namedTopology");
        MatcherAssert.assertThat((Object)TaskId.parse((String)task.toString()), (Matcher)CoreMatchers.equalTo((Object)task));
    }

    @Test
    public void shouldCreateTaskStateDirectory() {
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.getOrCreateDirectoryForTask(taskId);
        Assert.assertTrue((boolean)taskDirectory.exists());
        Assert.assertTrue((boolean)taskDirectory.isDirectory());
    }

    @Test
    public void shouldBeTrueIfAlreadyHoldsLock() {
        TaskId taskId = new TaskId(0, 0);
        this.directory.getOrCreateDirectoryForTask(taskId);
        this.directory.lock(taskId);
        try {
            Assert.assertTrue((boolean)this.directory.lock(taskId));
        }
        finally {
            this.directory.unlock(taskId);
        }
    }

    @Test
    public void shouldBeAbleToUnlockEvenWithoutLocking() {
        TaskId taskId = new TaskId(0, 0);
        this.directory.unlock(taskId);
    }

    @Test
    public void shouldReportDirectoryEmpty() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        Assert.assertTrue((boolean)this.directory.directoryForTaskIsEmpty(taskId));
        this.directory.lock(taskId);
        Assert.assertTrue((boolean)this.directory.directoryForTaskIsEmpty(taskId));
        OffsetCheckpoint checkpointFile = new OffsetCheckpoint(new File(this.directory.getOrCreateDirectoryForTask(taskId), ".checkpoint"));
        Assert.assertTrue((boolean)this.directory.directoryForTaskIsEmpty(taskId));
        checkpointFile.write(Collections.singletonMap(new TopicPartition("topic", 0), 0L));
        Assert.assertTrue((boolean)this.directory.directoryForTaskIsEmpty(taskId));
        File dbDir = new File(new File(this.directory.getOrCreateDirectoryForTask(taskId), "db"), "store1");
        Files.createDirectories(dbDir.getParentFile().toPath(), new FileAttribute[0]);
        Files.createDirectories(dbDir.getAbsoluteFile().toPath(), new FileAttribute[0]);
        Assert.assertFalse((boolean)this.directory.directoryForTaskIsEmpty(taskId));
        Utils.delete((File)dbDir.getParentFile());
        Assert.assertTrue((boolean)this.directory.directoryForTaskIsEmpty(taskId));
        this.directory.unlock(taskId);
        Assert.assertTrue((boolean)this.directory.directoryForTaskIsEmpty(taskId));
    }

    @Test
    public void shouldThrowProcessorStateException() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        Utils.delete((File)this.stateDir);
        Assert.assertThrows(ProcessorStateException.class, () -> this.directory.getOrCreateDirectoryForTask(taskId));
    }

    @Test
    public void shouldThrowProcessorStateExceptionIfStateDirOccupied() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        Utils.delete((File)this.appDir);
        Files.createFile(this.appDir.toPath(), new FileAttribute[0]);
        Assert.assertThrows(ProcessorStateException.class, () -> this.directory.getOrCreateDirectoryForTask(taskId));
    }

    @Test
    public void shouldThrowProcessorStateExceptionIfTestDirOccupied() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        File taskDir = new File(this.appDir, StateManagerUtil.toTaskDirString((TaskId)taskId));
        Utils.delete((File)taskDir);
        Files.createFile(taskDir.toPath(), new FileAttribute[0]);
        Assert.assertThrows(ProcessorStateException.class, () -> this.directory.getOrCreateDirectoryForTask(taskId));
    }

    @Test
    public void shouldNotThrowIfStateDirectoryHasBeenDeleted() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        Utils.delete((File)this.stateDir);
        Assert.assertThrows(IllegalStateException.class, () -> this.directory.lock(taskId));
    }

    @Test
    public void shouldLockMultipleTaskDirectories() {
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(1, 0);
        MatcherAssert.assertThat((Object)this.directory.lock(taskId), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.directory.lock(taskId2), (Matcher)CoreMatchers.is((Object)true));
        this.directory.unlock(taskId);
        this.directory.unlock(taskId2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() {
        TaskId task0 = new TaskId(0, 0);
        TaskId task1 = new TaskId(1, 0);
        TaskId task2 = new TaskId(2, 0);
        try {
            Assert.assertTrue((boolean)new File(this.directory.getOrCreateDirectoryForTask(task0), "store").mkdir());
            Assert.assertTrue((boolean)new File(this.directory.getOrCreateDirectoryForTask(task1), "store").mkdir());
            Assert.assertTrue((boolean)new File(this.directory.getOrCreateDirectoryForTask(task2), "store").mkdir());
            this.directory.lock(task0);
            this.directory.lock(task1);
            StateDirectory.TaskDirectory dir0 = new StateDirectory.TaskDirectory(new File(this.appDir, StateManagerUtil.toTaskDirString((TaskId)task0)), null);
            StateDirectory.TaskDirectory dir1 = new StateDirectory.TaskDirectory(new File(this.appDir, StateManagerUtil.toTaskDirString((TaskId)task1)), null);
            StateDirectory.TaskDirectory dir2 = new StateDirectory.TaskDirectory(new File(this.appDir, StateManagerUtil.toTaskDirString((TaskId)task2)), null);
            List files = this.directory.listAllTaskDirectories();
            Assert.assertEquals((Object)Utils.mkSet((Object[])new StateDirectory.TaskDirectory[]{dir0, dir1, dir2}), new HashSet(files));
            files = this.directory.listNonEmptyTaskDirectories();
            Assert.assertEquals((Object)Utils.mkSet((Object[])new StateDirectory.TaskDirectory[]{dir0, dir1, dir2}), new HashSet(files));
            this.time.sleep(5000L);
            this.directory.cleanRemovedTasks(0L);
            files = this.directory.listAllTaskDirectories();
            Assert.assertEquals((Object)Utils.mkSet((Object[])new StateDirectory.TaskDirectory[]{dir0, dir1}), new HashSet(files));
            files = this.directory.listNonEmptyTaskDirectories();
            Assert.assertEquals((Object)Utils.mkSet((Object[])new StateDirectory.TaskDirectory[]{dir0, dir1}), new HashSet(files));
        }
        finally {
            this.directory.unlock(task0);
            this.directory.unlock(task1);
        }
    }

    @Test
    public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
        File dir = this.directory.getOrCreateDirectoryForTask(new TaskId(2, 0));
        Assert.assertTrue((boolean)new File(dir, "store").mkdir());
        int cleanupDelayMs = 60000;
        this.directory.cleanRemovedTasks(60000L);
        Assert.assertTrue((boolean)dir.exists());
        Assert.assertEquals((long)1L, (long)this.directory.listAllTaskDirectories().size());
        Assert.assertEquals((long)1L, (long)this.directory.listNonEmptyTaskDirectories().size());
        this.time.sleep(61000L);
        this.directory.cleanRemovedTasks(60000L);
        Assert.assertFalse((boolean)dir.exists());
        Assert.assertEquals((long)0L, (long)this.directory.listAllTaskDirectories().size());
        Assert.assertEquals((long)0L, (long)this.directory.listNonEmptyTaskDirectories().size());
    }

    @Test
    public void shouldCleanupObsoleteTaskDirectoriesAndDeleteTheDirectoryItself() {
        File dir = this.directory.getOrCreateDirectoryForTask(new TaskId(2, 0));
        Assert.assertTrue((boolean)new File(dir, "store").mkdir());
        Assert.assertEquals((long)1L, (long)this.directory.listAllTaskDirectories().size());
        Assert.assertEquals((long)1L, (long)this.directory.listNonEmptyTaskDirectories().size());
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class);){
            this.time.sleep(5000L);
            this.directory.cleanRemovedTasks(0L);
            Assert.assertFalse((boolean)dir.exists());
            Assert.assertEquals((long)0L, (long)this.directory.listAllTaskDirectories().size());
            Assert.assertEquals((long)0L, (long)this.directory.listNonEmptyTaskDirectories().size());
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.containsString((String)"Deleting obsolete state directory")));
        }
    }

    @Test
    public void shouldNotRemoveNonTaskDirectoriesAndFiles() {
        File otherDir = TestUtils.tempDirectory((Path)this.stateDir.toPath(), (String)"foo");
        this.directory.cleanRemovedTasks(0L);
        Assert.assertTrue((boolean)otherDir.exists());
    }

    @Test
    public void shouldReturnEmptyArrayForNonPersistentApp() throws IOException {
        this.initializeStateDirectory(false, false);
        Assert.assertTrue((boolean)this.directory.listAllTaskDirectories().isEmpty());
    }

    @Test
    public void shouldReturnEmptyArrayIfStateDirDoesntExist() throws IOException {
        this.cleanup();
        Assert.assertFalse((boolean)this.stateDir.exists());
        Assert.assertTrue((boolean)this.directory.listAllTaskDirectories().isEmpty());
    }

    @Test
    public void shouldReturnEmptyArrayIfListFilesReturnsNull() throws IOException {
        this.stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString((int)5));
        this.directory = new StateDirectory(new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "applicationId");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", StateDirectoryTest.this.stateDir.getPath());
            }
        }), (Time)this.time, true, false);
        this.appDir = new File(this.stateDir, "applicationId");
        Utils.delete((File)this.appDir);
        Files.createFile(this.appDir.toPath(), new FileAttribute[0]);
        Assert.assertTrue((boolean)Files.exists(this.appDir.toPath(), new LinkOption[0]));
        Assert.assertNull((Object)this.appDir.listFiles());
        Assert.assertEquals((long)0L, (long)this.directory.listAllTaskDirectories().size());
    }

    @Test
    public void shouldOnlyListNonEmptyTaskDirectories() throws IOException {
        TestUtils.tempDirectory((Path)this.stateDir.toPath(), (String)"foo");
        StateDirectory.TaskDirectory taskDir1 = new StateDirectory.TaskDirectory(this.directory.getOrCreateDirectoryForTask(new TaskId(0, 0)), null);
        StateDirectory.TaskDirectory taskDir2 = new StateDirectory.TaskDirectory(this.directory.getOrCreateDirectoryForTask(new TaskId(0, 1)), null);
        File storeDir = new File(taskDir1.file(), "store");
        Assert.assertTrue((boolean)storeDir.mkdir());
        MatcherAssert.assertThat((Object)Utils.mkSet((Object[])new StateDirectory.TaskDirectory[]{taskDir1, taskDir2}), (Matcher)CoreMatchers.equalTo(new HashSet(this.directory.listAllTaskDirectories())));
        MatcherAssert.assertThat(Collections.singletonList(taskDir1), (Matcher)CoreMatchers.equalTo((Object)this.directory.listNonEmptyTaskDirectories()));
        Utils.delete((File)taskDir1.file());
        MatcherAssert.assertThat(Collections.singleton(taskDir2), (Matcher)CoreMatchers.equalTo(new HashSet(this.directory.listAllTaskDirectories())));
        MatcherAssert.assertThat(Collections.emptyList(), (Matcher)CoreMatchers.equalTo((Object)this.directory.listNonEmptyTaskDirectories()));
    }

    @Test
    public void shouldCreateDirectoriesIfParentDoesntExist() {
        File tempDir = TestUtils.tempDirectory();
        final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "applicationId");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", stateDir.getPath());
            }
        }), (Time)this.time, true, false);
        File taskDir = stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0));
        Assert.assertTrue((boolean)stateDir.exists());
        Assert.assertTrue((boolean)taskDir.exists());
    }

    @Test
    public void shouldNotLockStateDirLockedByAnotherThread() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        Thread thread = new Thread(() -> this.directory.lock(taskId));
        thread.start();
        thread.join(30000L);
        Assert.assertFalse((boolean)this.directory.lock(taskId));
    }

    @Test
    public void shouldNotUnLockStateDirLockedByAnotherThread() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        CountDownLatch lockLatch = new CountDownLatch(1);
        CountDownLatch unlockLatch = new CountDownLatch(1);
        AtomicReference exceptionOnThread = new AtomicReference();
        Thread thread = new Thread(() -> {
            try {
                this.directory.lock(taskId);
                lockLatch.countDown();
                unlockLatch.await();
                this.directory.unlock(taskId);
            }
            catch (Exception e) {
                exceptionOnThread.set(e);
            }
        });
        thread.start();
        lockLatch.await(5L, TimeUnit.SECONDS);
        Assert.assertNull((String)"should not have had an exception on other thread", exceptionOnThread.get());
        this.directory.unlock(taskId);
        Assert.assertFalse((boolean)this.directory.lock(taskId));
        unlockLatch.countDown();
        thread.join(30000L);
        Assert.assertNull((String)"should not have had an exception on other thread", exceptionOnThread.get());
        Assert.assertTrue((boolean)this.directory.lock(taskId));
    }

    @Test
    public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
        TaskId id = new TaskId(1, 0);
        this.directory.getOrCreateDirectoryForTask(id);
        this.directory.globalStateDir();
        File dir0 = new File(this.appDir, id.toString());
        File globalDir = new File(this.appDir, "global");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{dir0, globalDir}), Arrays.stream(Objects.requireNonNull(this.appDir.listFiles())).collect(Collectors.toSet()));
        this.directory.clean();
        Assert.assertFalse((boolean)this.appDir.exists());
    }

    @Test
    public void shouldNotCreateBaseDirectory() throws IOException {
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class);){
            this.initializeStateDirectory(false, false);
            MatcherAssert.assertThat((Object)this.stateDir.exists(), (Matcher)CoreMatchers.is((Object)false));
            MatcherAssert.assertThat((Object)this.appDir.exists(), (Matcher)CoreMatchers.is((Object)false));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.not((Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.containsString((String)"Error changing permissions for the state or base directory"))));
        }
    }

    @Test
    public void shouldNotCreateTaskStateDirectory() throws IOException {
        this.initializeStateDirectory(false, false);
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.getOrCreateDirectoryForTask(taskId);
        Assert.assertFalse((boolean)taskDirectory.exists());
    }

    @Test
    public void shouldNotCreateGlobalStateDirectory() throws IOException {
        this.initializeStateDirectory(false, false);
        File globalStateDir = this.directory.globalStateDir();
        Assert.assertFalse((boolean)globalStateDir.exists());
    }

    @Test
    public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
        this.initializeStateDirectory(false, false);
        TaskId taskId = new TaskId(0, 0);
        Assert.assertTrue((boolean)this.directory.lock(taskId));
    }

    @Test
    public void shouldNotFailWhenCreatingTaskDirectoryInParallel() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        AtomicBoolean passed = new AtomicBoolean(true);
        CreateTaskDirRunner runner = new CreateTaskDirRunner(this.directory, taskId, passed);
        Thread t1 = new Thread(runner);
        Thread t2 = new Thread(runner);
        t1.start();
        t2.start();
        t1.join(Duration.ofMillis(500L).toMillis());
        t2.join(Duration.ofMillis(500L).toMillis());
        Assert.assertNotNull((Object)runner.taskDirectory);
        Assert.assertTrue((boolean)passed.get());
        Assert.assertTrue((boolean)runner.taskDirectory.exists());
        Assert.assertTrue((boolean)runner.taskDirectory.isDirectory());
    }

    @Test
    public void shouldDeleteAppDirWhenCleanUpIfEmpty() {
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.getOrCreateDirectoryForTask(taskId);
        File testFile = new File(taskDirectory, "testFile");
        MatcherAssert.assertThat((Object)testFile.mkdir(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.directory.directoryForTaskIsEmpty(taskId), (Matcher)CoreMatchers.is((Object)false));
        this.directory.clean();
        Assert.assertFalse((boolean)this.appDir.exists());
    }

    @Test
    public void shouldNotDeleteAppDirWhenCleanUpIfNotEmpty() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.getOrCreateDirectoryForTask(taskId);
        File testFile = new File(taskDirectory, "testFile");
        MatcherAssert.assertThat((Object)testFile.mkdir(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.directory.directoryForTaskIsEmpty(taskId), (Matcher)CoreMatchers.is((Object)false));
        File dummyFile = new File(this.appDir, "dummy");
        Files.createFile(dummyFile.toPath(), new FileAttribute[0]);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class);){
            this.directory.clean();
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.endsWith((String)String.format("Failed to delete state store directory of %s for it is not empty", this.appDir.getAbsolutePath()))));
        }
    }

    @Test
    public void shouldLogManualUserCallMessage() {
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.getOrCreateDirectoryForTask(taskId);
        File testFile = new File(taskDirectory, "testFile");
        MatcherAssert.assertThat((Object)testFile.mkdir(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.directory.directoryForTaskIsEmpty(taskId), (Matcher)CoreMatchers.is((Object)false));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class);){
            this.directory.clean();
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.endsWith((String)"as user calling cleanup.")));
        }
    }

    @Test
    public void shouldLogStateDirCleanerMessage() {
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.getOrCreateDirectoryForTask(taskId);
        File testFile = new File(taskDirectory, "testFile");
        MatcherAssert.assertThat((Object)testFile.mkdir(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.directory.directoryForTaskIsEmpty(taskId), (Matcher)CoreMatchers.is((Object)false));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class);){
            long cleanupDelayMs = 0L;
            this.time.sleep(5000L);
            this.directory.cleanRemovedTasks(0L);
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.endsWith((String)"ms has elapsed (cleanup delay is 0ms).")));
        }
    }

    @Test
    public void shouldLogTempDirMessage() {
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class);){
            new StateDirectory(new StreamsConfig(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)""), Utils.mkEntry((Object)"application.id", (Object)"")})), (Time)new MockTime(), true, false);
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)("Using an OS temp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS. Resolved state.dir: [" + System.getProperty("java.io.tmpdir") + "/kafka-streams]")));
        }
    }

    @Test
    public void shouldCreateTaskDirectoriesUnderNamedTopologyDirs() throws IOException {
        this.initializeStateDirectory(true, true);
        this.directory.getOrCreateDirectoryForTask(new TaskId(0, 0, "topology1"));
        this.directory.getOrCreateDirectoryForTask(new TaskId(0, 1, "topology1"));
        this.directory.getOrCreateDirectoryForTask(new TaskId(0, 0, "topology2"));
        MatcherAssert.assertThat((Object)new File(this.appDir, "__topology1__").exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(this.appDir, "__topology1__").isDirectory(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(this.appDir, "__topology2__").exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(this.appDir, "__topology2__").isDirectory(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(new File(this.appDir, "__topology1__"), "0_0").exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(new File(this.appDir, "__topology1__"), "0_0").isDirectory(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(new File(this.appDir, "__topology1__"), "0_1").exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(new File(this.appDir, "__topology1__"), "0_1").isDirectory(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(new File(this.appDir, "__topology2__"), "0_0").exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)new File(new File(this.appDir, "__topology2__"), "0_0").isDirectory(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldOnlyListNonEmptyTaskDirectoriesInNamedTopologies() throws IOException {
        this.initializeStateDirectory(true, true);
        TestUtils.tempDirectory((Path)this.appDir.toPath(), (String)"foo");
        StateDirectory.TaskDirectory taskDir1 = new StateDirectory.TaskDirectory(this.directory.getOrCreateDirectoryForTask(new TaskId(0, 0, "topology1")), "topology1");
        StateDirectory.TaskDirectory taskDir2 = new StateDirectory.TaskDirectory(this.directory.getOrCreateDirectoryForTask(new TaskId(0, 1, "topology1")), "topology1");
        StateDirectory.TaskDirectory taskDir3 = new StateDirectory.TaskDirectory(this.directory.getOrCreateDirectoryForTask(new TaskId(0, 0, "topology2")), "topology2");
        File storeDir = new File(taskDir1.file(), "store");
        Assert.assertTrue((boolean)storeDir.mkdir());
        MatcherAssert.assertThat(new HashSet(this.directory.listAllTaskDirectories()), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new StateDirectory.TaskDirectory[]{taskDir1, taskDir2, taskDir3})));
        MatcherAssert.assertThat((Object)this.directory.listNonEmptyTaskDirectories(), (Matcher)CoreMatchers.equalTo(Collections.singletonList(taskDir1)));
        Utils.delete((File)taskDir1.file());
        MatcherAssert.assertThat(new HashSet(this.directory.listAllTaskDirectories()), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new StateDirectory.TaskDirectory[]{taskDir2, taskDir3})));
        MatcherAssert.assertThat((Object)this.directory.listNonEmptyTaskDirectories(), (Matcher)CoreMatchers.equalTo(Collections.emptyList()));
    }

    @Test
    public void shouldRemoveNonEmptyNamedTopologyDirsWhenCallingClean() throws Exception {
        this.initializeStateDirectory(true, true);
        File taskDir = this.directory.getOrCreateDirectoryForTask(new TaskId(2, 0, "topology1"));
        File namedTopologyDir = new File(this.appDir, "__topology1__");
        MatcherAssert.assertThat((Object)taskDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        this.directory.clean();
        MatcherAssert.assertThat((Object)taskDir.exists(), (Matcher)CoreMatchers.is((Object)false));
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void shouldRemoveEmptyNamedTopologyDirsWhenCallingClean() throws IOException {
        this.initializeStateDirectory(true, true);
        File namedTopologyDir = new File(this.appDir, "__topology1__");
        MatcherAssert.assertThat((Object)namedTopologyDir.mkdir(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        this.directory.clean();
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void shouldRemoveNonEmptyNamedTopologyDirsWhenCallingClearLocalStateForNamedTopology() throws Exception {
        this.initializeStateDirectory(true, true);
        String topologyName = "topology1";
        File taskDir = this.directory.getOrCreateDirectoryForTask(new TaskId(2, 0, "topology1"));
        File namedTopologyDir = new File(this.appDir, "__topology1__");
        MatcherAssert.assertThat((Object)taskDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        this.directory.clearLocalStateForNamedTopology("topology1");
        MatcherAssert.assertThat((Object)taskDir.exists(), (Matcher)CoreMatchers.is((Object)false));
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void shouldRemoveEmptyNamedTopologyDirsWhenCallingClearLocalStateForNamedTopology() throws IOException {
        this.initializeStateDirectory(true, true);
        String topologyName = "topology1";
        File namedTopologyDir = new File(this.appDir, "__topology1__");
        MatcherAssert.assertThat((Object)namedTopologyDir.mkdir(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        this.directory.clearLocalStateForNamedTopology("topology1");
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void shouldNotRemoveDirsThatDoNotMatchNamedTopologyDirsWhenCallingClean() throws IOException {
        this.initializeStateDirectory(true, true);
        File someDir = new File(this.appDir, "_not-a-valid-named-topology_dir_name_");
        MatcherAssert.assertThat((Object)someDir.mkdir(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)someDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        this.directory.clean();
        MatcherAssert.assertThat((Object)someDir.exists(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldCleanupObsoleteTaskDirectoriesInNamedTopologiesAndDeleteTheParentDirectories() throws IOException {
        this.initializeStateDirectory(true, true);
        File taskDir = this.directory.getOrCreateDirectoryForTask(new TaskId(2, 0, "topology1"));
        File namedTopologyDir = new File(this.appDir, "__topology1__");
        MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)taskDir.exists(), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertTrue((boolean)new File(taskDir, "store").mkdir());
        MatcherAssert.assertThat((Object)this.directory.listAllTaskDirectories().size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)this.directory.listNonEmptyTaskDirectories().size(), (Matcher)CoreMatchers.is((Object)1));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class);){
            this.time.sleep(5000L);
            this.directory.cleanRemovedTasks(0L);
            MatcherAssert.assertThat((Object)taskDir.exists(), (Matcher)CoreMatchers.is((Object)false));
            MatcherAssert.assertThat((Object)namedTopologyDir.exists(), (Matcher)CoreMatchers.is((Object)false));
            MatcherAssert.assertThat((Object)this.directory.listAllTaskDirectories().size(), (Matcher)CoreMatchers.is((Object)0));
            MatcherAssert.assertThat((Object)this.directory.listNonEmptyTaskDirectories().size(), (Matcher)CoreMatchers.is((Object)0));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.containsString((String)"Deleting obsolete state directory")));
        }
    }

    @Test
    public void shouldPersistProcessIdAcrossRestart() {
        UUID processId = this.directory.initializeProcessId();
        this.directory.close();
        MatcherAssert.assertThat((Object)this.directory.initializeProcessId(), (Matcher)CoreMatchers.equalTo((Object)processId));
    }

    @Test
    public void shouldGetFreshProcessIdIfProcessFileDeleted() {
        UUID processId = this.directory.initializeProcessId();
        this.directory.close();
        File processFile = new File(this.appDir, "kafka-streams-process-metadata");
        MatcherAssert.assertThat((Object)processFile.exists(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)processFile.delete(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.directory.initializeProcessId(), (Matcher)CoreMatchers.not((Object)processId));
    }

    @Test
    public void shouldGetFreshProcessIdIfJsonUnreadable() throws Exception {
        File processFile = new File(this.appDir, "kafka-streams-process-metadata");
        Files.createFile(processFile.toPath(), new FileAttribute[0]);
        UUID processId = UUID.randomUUID();
        FileOutputStream fileOutputStream = new FileOutputStream(processFile);
        try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter((OutputStream)fileOutputStream, StandardCharsets.UTF_8));){
            writer.write(processId.toString());
            writer.flush();
            fileOutputStream.getFD().sync();
        }
        MatcherAssert.assertThat((Object)this.directory.initializeProcessId(), (Matcher)CoreMatchers.not((Object)processId));
    }

    @Test
    public void shouldReadFutureProcessFileFormat() throws Exception {
        File processFile = new File(this.appDir, "kafka-streams-process-metadata");
        ObjectMapper mapper = new ObjectMapper();
        UUID processId = UUID.randomUUID();
        mapper.writeValue(processFile, (Object)new FutureStateDirectoryProcessFile(processId, "some random junk"));
        MatcherAssert.assertThat((Object)this.directory.initializeProcessId(), (Matcher)CoreMatchers.equalTo((Object)processId));
    }

    private static class CreateTaskDirRunner
    implements Runnable {
        private final StateDirectory directory;
        private final TaskId taskId;
        private final AtomicBoolean passed;
        private File taskDirectory;

        private CreateTaskDirRunner(StateDirectory directory, TaskId taskId, AtomicBoolean passed) {
            this.directory = directory;
            this.taskId = taskId;
            this.passed = passed;
        }

        @Override
        public void run() {
            try {
                this.taskDirectory = this.directory.getOrCreateDirectoryForTask(this.taskId);
            }
            catch (ProcessorStateException error) {
                this.passed.set(false);
            }
        }
    }

    private static class FutureStateDirectoryProcessFile {
        @JsonProperty
        private final UUID processId;
        @JsonProperty
        private final String newField;

        public FutureStateDirectoryProcessFile() {
            this.processId = null;
            this.newField = null;
        }

        FutureStateDirectoryProcessFile(UUID processId, String newField) {
            this.processId = processId;
            this.newField = newField;
        }
    }
}

