Class StreamTask
- java.lang.Object
-
- org.apache.kafka.streams.processor.internals.AbstractTask
-
- org.apache.kafka.streams.processor.internals.StreamTask
-
- All Implemented Interfaces:
ProcessorNodePunctuator
,Task
public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
A StreamTask is associated with aPartitionGroup
, and is assigned to a StreamThread for processing.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
StreamTask.ProducerSupplier
protected static class
StreamTask.TaskMetrics
-
Constructor Summary
Constructors Constructor Description StreamTask(TaskId id, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, ProcessorTopology topology, org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl metrics, StateDirectory stateDirectory, ThreadCache cache, org.apache.kafka.common.utils.Time time, StreamTask.ProducerSupplier producerSupplier, org.apache.kafka.common.metrics.Sensor closeSensor)
StreamTask(TaskId id, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, ProcessorTopology topology, org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl metrics, StateDirectory stateDirectory, ThreadCache cache, org.apache.kafka.common.utils.Time time, StreamTask.ProducerSupplier producerSupplier, RecordCollector recordCollector, org.apache.kafka.common.metrics.Sensor closeSensor)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long>
activeTaskCheckpointableOffsets()
void
addRecords(org.apache.kafka.common.TopicPartition partition, java.lang.Iterable<org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]>> records)
Adds records to queues.void
close(boolean clean, boolean isZombie)
-suspend(clean)
- close topology - if (clean)commit()
- flush state and producer - commit offsets - close state - if (clean) write checkpoint - if (eos) close producervoid
closeSuspended(boolean clean, boolean isZombie, java.lang.RuntimeException firstException)
void
commit()
- flush state and producer - if(!eos) write checkpoint - commit offsets and start new transactionprotected void
flushState()
boolean
initializeStateStores()
Initialize the task and returntrue
if the task is ready to run, i.e, it has not state storesvoid
initializeTopology()
- (re-)initialize the topology of the taskboolean
maybePunctuateStreamTime()
Possibly trigger registered stream-time punctuation functions if current partition group timestamp has reached the defined stamp Note, this is only called in the presence of new recordsboolean
maybePunctuateSystemTime()
Possibly trigger registered system-time punctuation functions if current system timestamp has reached the defined stamp Note, this is called irrespective of the presence of new recordsboolean
process()
Process one record.void
punctuate(ProcessorNode node, long timestamp, PunctuationType type, Punctuator punctuator)
void
resume()
- resume the taskCancellable
schedule(long interval, PunctuationType type, Punctuator punctuator)
Schedules a punctuation for the processorvoid
suspend()
- close topology -commit()
- flush state and producer - if (!eos) write checkpoint - commit offsets-
Methods inherited from class org.apache.kafka.streams.processor.internals.AbstractTask
applicationId, changelogPartitions, commitNeeded, context, getStore, hasStateStores, id, isClosed, isEosEnabled, partitions, topology, toString, toString, updateOffsetLimits
-
-
-
-
Constructor Detail
-
StreamTask
public StreamTask(TaskId id, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, ProcessorTopology topology, org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl metrics, StateDirectory stateDirectory, ThreadCache cache, org.apache.kafka.common.utils.Time time, StreamTask.ProducerSupplier producerSupplier, org.apache.kafka.common.metrics.Sensor closeSensor)
-
StreamTask
public StreamTask(TaskId id, java.util.Collection<org.apache.kafka.common.TopicPartition> partitions, ProcessorTopology topology, org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl metrics, StateDirectory stateDirectory, ThreadCache cache, org.apache.kafka.common.utils.Time time, StreamTask.ProducerSupplier producerSupplier, RecordCollector recordCollector, org.apache.kafka.common.metrics.Sensor closeSensor)
-
-
Method Detail
-
initializeStateStores
public boolean initializeStateStores()
Description copied from interface:Task
Initialize the task and returntrue
if the task is ready to run, i.e, it has not state stores- Specified by:
initializeStateStores
in interfaceTask
- Returns:
- true if this task has no state stores that may need restoring.
-
initializeTopology
public void initializeTopology()
- (re-)initialize the topology of the task
- Specified by:
initializeTopology
in interfaceTask
- Throws:
TaskMigratedException
- if the task producer got fenced (EOS only)
-
process
public boolean process()
Process one record.- Returns:
- true if this method processes a record, false if it does not process a record.
- Throws:
TaskMigratedException
- if the task producer got fenced (EOS only)
-
punctuate
public void punctuate(ProcessorNode node, long timestamp, PunctuationType type, Punctuator punctuator)
- Specified by:
punctuate
in interfaceProcessorNodePunctuator
- Throws:
java.lang.IllegalStateException
- if the current node is not nullTaskMigratedException
- if the task producer got fenced (EOS only)
-
commit
public void commit()
- flush state and producer - if(!eos) write checkpoint - commit offsets and start new transaction
- Specified by:
commit
in interfaceTask
- Throws:
TaskMigratedException
- if committing offsets failed (non-EOS) or if the task producer got fenced (EOS)
-
activeTaskCheckpointableOffsets
protected java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> activeTaskCheckpointableOffsets()
- Overrides:
activeTaskCheckpointableOffsets
in classAbstractTask
-
flushState
protected void flushState()
-
suspend
public void suspend()
- close topology -
commit()
- flush state and producer - if (!eos) write checkpoint - commit offsets- Specified by:
suspend
in interfaceTask
- Throws:
TaskMigratedException
- if committing offsets failed (non-EOS) or if the task producer got fenced (EOS)
-
closeSuspended
public void closeSuspended(boolean clean, boolean isZombie, java.lang.RuntimeException firstException)
- Specified by:
closeSuspended
in interfaceTask
-
close
public void close(boolean clean, boolean isZombie)
-
suspend(clean)
- close topology - if (clean)commit()
- flush state and producer - commit offsets - close state - if (clean) write checkpoint - if (eos) close producer- Specified by:
close
in interfaceTask
- Parameters:
clean
- shut down cleanly (ie, incl. flush and commit) iftrue
-- otherwise, just close open resourcesisZombie
-true
is this task is a zombie or not (this will repressTaskMigratedException
- Throws:
TaskMigratedException
- if committing offsets failed (non-EOS) or if the task producer got fenced (EOS)
-
addRecords
public void addRecords(org.apache.kafka.common.TopicPartition partition, java.lang.Iterable<org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]>> records)
Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped and not added to the queue for processing- Parameters:
partition
- the partitionrecords
- the records
-
schedule
public Cancellable schedule(long interval, PunctuationType type, Punctuator punctuator)
Schedules a punctuation for the processor- Parameters:
interval
- the interval in millisecondstype
- the punctuation type- Throws:
java.lang.IllegalStateException
- if the current node is not null
-
maybePunctuateStreamTime
public boolean maybePunctuateStreamTime()
Possibly trigger registered stream-time punctuation functions if current partition group timestamp has reached the defined stamp Note, this is only called in the presence of new records- Throws:
TaskMigratedException
- if the task producer got fenced (EOS only)
-
maybePunctuateSystemTime
public boolean maybePunctuateSystemTime()
Possibly trigger registered system-time punctuation functions if current system timestamp has reached the defined stamp Note, this is called irrespective of the presence of new records- Throws:
TaskMigratedException
- if the task producer got fenced (EOS only)
-
-