public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
PartitionGroup
, and is assigned to a StreamThread for processing.Modifier and Type | Class and Description |
---|---|
static interface |
StreamTask.ProducerSupplier |
protected static class |
StreamTask.TaskMetrics |
Constructor and Description |
---|
StreamTask(TaskId id,
java.util.Collection<org.apache.kafka.common.TopicPartition> partitions,
ProcessorTopology topology,
org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> consumer,
ChangelogReader changelogReader,
StreamsConfig config,
StreamsMetricsImpl metrics,
StateDirectory stateDirectory,
ThreadCache cache,
org.apache.kafka.common.utils.Time time,
StreamTask.ProducerSupplier producerSupplier,
RecordCollector recordCollector,
org.apache.kafka.common.metrics.Sensor closeSensor) |
StreamTask(TaskId id,
java.util.Collection<org.apache.kafka.common.TopicPartition> partitions,
ProcessorTopology topology,
org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> consumer,
ChangelogReader changelogReader,
StreamsConfig config,
StreamsMetricsImpl metrics,
StateDirectory stateDirectory,
ThreadCache cache,
org.apache.kafka.common.utils.Time time,
StreamTask.ProducerSupplier producerSupplier,
org.apache.kafka.common.metrics.Sensor closeSensor) |
Modifier and Type | Method and Description |
---|---|
protected java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> |
activeTaskCheckpointableOffsets() |
void |
addRecords(org.apache.kafka.common.TopicPartition partition,
java.lang.Iterable<org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]>> records)
Adds records to queues.
|
void |
close(boolean clean,
boolean isZombie)
-
suspend(clean)
- close topology
- if (clean) commit()
- flush state and producer
- commit offsets
- close state
- if (clean) write checkpoint
- if (eos) close producer |
void |
closeSuspended(boolean clean,
boolean isZombie,
java.lang.RuntimeException firstException) |
void |
commit()
- flush state and producer
- if(!eos) write checkpoint
- commit offsets and start new transaction
|
protected void |
flushState()
Flush all state stores owned by this task
|
boolean |
initializeStateStores()
Initialize the task and return
true if the task is ready to run, i.e, it has not state stores |
void |
initializeTopology()
- (re-)initialize the topology of the task
|
boolean |
maybePunctuateStreamTime()
Possibly trigger registered stream-time punctuation functions if
current partition group timestamp has reached the defined stamp
Note, this is only called in the presence of new records
|
boolean |
maybePunctuateSystemTime()
Possibly trigger registered system-time punctuation functions if
current system timestamp has reached the defined stamp
Note, this is called irrespective of the presence of new records
|
boolean |
process()
Process one record.
|
void |
punctuate(ProcessorNode node,
long timestamp,
PunctuationType type,
Punctuator punctuator) |
void |
resume()
- resume the task
|
Cancellable |
schedule(long interval,
PunctuationType type,
Punctuator punctuator)
Schedules a punctuation for the processor
|
void |
suspend()
- close topology
-
commit()
- flush state and producer
- if (!eos) write checkpoint
- commit offsets |
applicationId, changelogPartitions, commitNeeded, context, getStore, hasStateStores, id, isClosed, isEosEnabled, partitions, topology, toString, toString, updateOffsetLimits
public StreamTask(TaskId id, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, ProcessorTopology topology, org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl metrics, StateDirectory stateDirectory, ThreadCache cache, org.apache.kafka.common.utils.Time time, StreamTask.ProducerSupplier producerSupplier, org.apache.kafka.common.metrics.Sensor closeSensor)
public StreamTask(TaskId id, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, ProcessorTopology topology, org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl metrics, StateDirectory stateDirectory, ThreadCache cache, org.apache.kafka.common.utils.Time time, StreamTask.ProducerSupplier producerSupplier, RecordCollector recordCollector, org.apache.kafka.common.metrics.Sensor closeSensor)
public boolean initializeStateStores()
Task
true
if the task is ready to run, i.e, it has not state storesinitializeStateStores
in interface Task
public void initializeTopology()
- (re-)initialize the topology of the task
initializeTopology
in interface Task
TaskMigratedException
- if the task producer got fenced (EOS only)public boolean process()
TaskMigratedException
- if the task producer got fenced (EOS only)public void punctuate(ProcessorNode node, long timestamp, PunctuationType type, Punctuator punctuator)
punctuate
in interface ProcessorNodePunctuator
java.lang.IllegalStateException
- if the current node is not nullTaskMigratedException
- if the task producer got fenced (EOS only)public void commit()
- flush state and producer - if(!eos) write checkpoint - commit offsets and start new transaction
commit
in interface Task
TaskMigratedException
- if committing offsets failed (non-EOS)
or if the task producer got fenced (EOS)protected java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> activeTaskCheckpointableOffsets()
activeTaskCheckpointableOffsets
in class AbstractTask
protected void flushState()
AbstractTask
public void suspend()
- close topology
- commit()
- flush state and producer
- if (!eos) write checkpoint
- commit offsets
suspend
in interface Task
TaskMigratedException
- if committing offsets failed (non-EOS)
or if the task producer got fenced (EOS)public void closeSuspended(boolean clean, boolean isZombie, java.lang.RuntimeException firstException)
closeSuspended
in interface Task
public void close(boolean clean, boolean isZombie)
-suspend(clean)
- close topology - if (clean)commit()
- flush state and producer - commit offsets - close state - if (clean) write checkpoint - if (eos) close producer
close
in interface Task
clean
- shut down cleanly (ie, incl. flush and commit) if true
--
otherwise, just close open resourcesisZombie
- true
is this task is a zombie or not (this will repress TaskMigratedException
TaskMigratedException
- if committing offsets failed (non-EOS)
or if the task producer got fenced (EOS)public void addRecords(org.apache.kafka.common.TopicPartition partition, java.lang.Iterable<org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]>> records)
partition
- the partitionrecords
- the recordspublic Cancellable schedule(long interval, PunctuationType type, Punctuator punctuator)
interval
- the interval in millisecondstype
- the punctuation typejava.lang.IllegalStateException
- if the current node is not nullpublic boolean maybePunctuateStreamTime()
TaskMigratedException
- if the task producer got fenced (EOS only)public boolean maybePunctuateSystemTime()
TaskMigratedException
- if the task producer got fenced (EOS only)