Class ProcessorStateManager


  • public class ProcessorStateManager
    extends java.lang.Object
    ProcessorStateManager is the source of truth for the current offset for each state store, which is either the read offset during restoring, or the written offset during normal processing. The offset is initialized as null when the state store is registered, and then it can be updated by loading checkpoint file, restore state stores, or passing from the record collector's written offsets. When checkpointing, if the offset is not null it would be written to the file. The manager is also responsible for restoring state stores via their registered restore callback, which is used for both updating standby tasks as well as restoring active tasks.
    • Constructor Detail

      • ProcessorStateManager

        public ProcessorStateManager​(TaskId taskId,
                                     Task.TaskType taskType,
                                     boolean eosEnabled,
                                     org.apache.kafka.common.utils.LogContext logContext,
                                     StateDirectory stateDirectory,
                                     ChangelogRegister changelogReader,
                                     java.util.Map<java.lang.String,​java.lang.String> storeToChangelogTopic,
                                     java.util.Collection<org.apache.kafka.common.TopicPartition> sourcePartitions)
                              throws ProcessorStateException
        Throws:
        ProcessorStateException - if the task directory does not exist and could not be created
    • Method Detail

      • storeChangelogTopic

        public static java.lang.String storeChangelogTopic​(java.lang.String applicationId,
                                                           java.lang.String storeName,
                                                           java.lang.String internalStream)
      • getGlobalStore

        public StateStore getGlobalStore​(java.lang.String name)
      • baseDir

        public java.io.File baseDir()
      • getStore

        public StateStore getStore​(java.lang.String name)
      • changelogOffsets

        public java.util.Map<org.apache.kafka.common.TopicPartition,​java.lang.Long> changelogOffsets()
      • flush

        public void flush()
        Throws:
        TaskMigratedException - recoverable error sending changelog records that would cause the task to be removed
        StreamsException - fatal error when flushing the state store, for example sending changelog records failed or flushing state store get IO errors; such error should cause the thread to die
      • close

        public void close()
                   throws ProcessorStateException
        Close all stores (even in case of failure). Log all exceptions and re-throw the first exception that occurred at the end.
        Throws:
        ProcessorStateException - if any error happens when closing the state stores
      • checkpoint

        public void checkpoint​(java.util.Map<org.apache.kafka.common.TopicPartition,​java.lang.Long> writtenOffsets)