Class ProcessorStateManager
- java.lang.Object
-
- org.apache.kafka.streams.processor.internals.ProcessorStateManager
-
public class ProcessorStateManager extends java.lang.Object
-
-
Constructor Summary
Constructors Constructor Description ProcessorStateManager(TaskId taskId, java.util.Collection<org.apache.kafka.common.TopicPartition> sources, boolean isStandby, StateDirectory stateDirectory, java.util.Map<java.lang.String,java.lang.String> storeToChangelogTopic, ChangelogReader changelogReader, boolean eosEnabled, org.apache.kafka.common.utils.LogContext logContext)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description java.io.File
baseDir()
void
checkpoint(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> checkpointableOffsets)
java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long>
checkpointed()
void
close(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> ackedOffsets)
Close
all stores (even in case of failure).void
flush()
StateStore
getGlobalStore(java.lang.String name)
StateStore
getStore(java.lang.String name)
void
register(StateStore store, StateRestoreCallback stateRestoreCallback)
void
reinitializeStateStoresForPartitions(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, InternalProcessorContext processorContext)
void
reinitializeStateStoresForPartitions(org.slf4j.Logger log, java.util.Map<java.lang.String,StateStore> stateStores, java.util.Map<java.lang.String,java.lang.String> storeToChangelogTopic, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, InternalProcessorContext processorContext)
static java.lang.String
storeChangelogTopic(java.lang.String applicationId, java.lang.String storeName, java.lang.String internalStream)
-
-
-
Constructor Detail
-
ProcessorStateManager
public ProcessorStateManager(TaskId taskId, java.util.Collection<org.apache.kafka.common.TopicPartition> sources, boolean isStandby, StateDirectory stateDirectory, java.util.Map<java.lang.String,java.lang.String> storeToChangelogTopic, ChangelogReader changelogReader, boolean eosEnabled, org.apache.kafka.common.utils.LogContext logContext) throws java.io.IOException
- Throws:
ProcessorStateException
- if the task directory does not exist and could not be createdjava.io.IOException
- if any severe error happens while creating or locking the state directory
-
-
Method Detail
-
storeChangelogTopic
public static java.lang.String storeChangelogTopic(java.lang.String applicationId, java.lang.String storeName, java.lang.String internalStream)
-
baseDir
public java.io.File baseDir()
-
register
public void register(StateStore store, StateRestoreCallback stateRestoreCallback)
-
reinitializeStateStoresForPartitions
public void reinitializeStateStoresForPartitions(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, InternalProcessorContext processorContext)
-
checkpointed
public java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> checkpointed()
-
getStore
public StateStore getStore(java.lang.String name)
-
flush
public void flush()
-
close
public void close(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> ackedOffsets) throws ProcessorStateException
Close
all stores (even in case of failure). Log all exception and re-throw the first exception that did occur at the end.- Throws:
ProcessorStateException
- if any error happens when closing the state stores
-
checkpoint
public void checkpoint(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> checkpointableOffsets)
-
getGlobalStore
public StateStore getGlobalStore(java.lang.String name)
-
reinitializeStateStoresForPartitions
public void reinitializeStateStoresForPartitions(org.slf4j.Logger log, java.util.Map<java.lang.String,StateStore> stateStores, java.util.Map<java.lang.String,java.lang.String> storeToChangelogTopic, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, InternalProcessorContext processorContext)
-
-