public class ProcessorStateManager
extends java.lang.Object
Constructor and Description |
---|
ProcessorStateManager(TaskId taskId,
java.util.Collection<org.apache.kafka.common.TopicPartition> sources,
boolean isStandby,
StateDirectory stateDirectory,
java.util.Map<java.lang.String,java.lang.String> storeToChangelogTopic,
ChangelogReader changelogReader,
boolean eosEnabled,
org.apache.kafka.common.utils.LogContext logContext) |
Modifier and Type | Method and Description |
---|---|
java.io.File |
baseDir() |
void |
checkpoint(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> checkpointableOffsets) |
java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> |
checkpointed() |
void |
close(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> ackedOffsets)
Close all stores (even in case of failure). |
void |
flush() |
StateStore |
getGlobalStore(java.lang.String name) |
StateStore |
getStore(java.lang.String name) |
void |
register(StateStore store,
StateRestoreCallback stateRestoreCallback) |
void |
reinitializeStateStoresForPartitions(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions,
InternalProcessorContext processorContext) |
void |
reinitializeStateStoresForPartitions(org.slf4j.Logger log,
java.util.Map<java.lang.String,StateStore> stateStores,
java.util.Map<java.lang.String,java.lang.String> storeToChangelogTopic,
java.util.Collection<org.apache.kafka.common.TopicPartition> partitions,
InternalProcessorContext processorContext) |
static java.lang.String |
storeChangelogTopic(java.lang.String applicationId,
java.lang.String storeName,
java.lang.String internalStream) |
public ProcessorStateManager(TaskId taskId, java.util.Collection<org.apache.kafka.common.TopicPartition> sources, boolean isStandby, StateDirectory stateDirectory, java.util.Map<java.lang.String,java.lang.String> storeToChangelogTopic, ChangelogReader changelogReader, boolean eosEnabled, org.apache.kafka.common.utils.LogContext logContext) throws java.io.IOException
ProcessorStateException
- if the task directory does not exist and could not be createdjava.io.IOException
- if any severe error happens while creating or locking the state directorypublic static java.lang.String storeChangelogTopic(java.lang.String applicationId, java.lang.String storeName, java.lang.String internalStream)
public java.io.File baseDir()
public void register(StateStore store, StateRestoreCallback stateRestoreCallback)
public void reinitializeStateStoresForPartitions(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, InternalProcessorContext processorContext)
public java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> checkpointed()
public StateStore getStore(java.lang.String name)
public void flush()
public void close(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> ackedOffsets) throws ProcessorStateException
Close
all stores (even in case of failure).
Log all exception and re-throw the first exception that did occur at the end.ProcessorStateException
- if any error happens when closing the state storespublic void checkpoint(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> checkpointableOffsets)
public StateStore getGlobalStore(java.lang.String name)
public void reinitializeStateStoresForPartitions(org.slf4j.Logger log, java.util.Map<java.lang.String,StateStore> stateStores, java.util.Map<java.lang.String,java.lang.String> storeToChangelogTopic, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, InternalProcessorContext processorContext)