K
- the key typeS
- the source's (parent's) value typeV
- the value typepublic class KTableImpl<K,S,V> extends AbstractStream<K,V> implements KTable<K,V>
KTable
.builder, keySerde, name, sourceNodes, streamsGraphNode, valSerde
Constructor and Description |
---|
KTableImpl(java.lang.String name,
org.apache.kafka.common.serialization.Serde<K> keySerde,
org.apache.kafka.common.serialization.Serde<V> valSerde,
java.util.Set<java.lang.String> sourceNodes,
java.lang.String queryableStoreName,
boolean isQueryable,
ProcessorSupplier<?,?> processorSupplier,
StreamsGraphNode streamsGraphNode,
InternalStreamsBuilder builder) |
Modifier and Type | Method and Description |
---|---|
KTable<K,V> |
filter(Predicate<? super K,? super V> predicate)
Create a new
KTable that consists of all records of this KTable which satisfy the given
predicate, with default serializers, deserializers, and state store. |
KTable<K,V> |
filter(Predicate<? super K,? super V> predicate,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a new
KTable that consists of all records of this KTable which satisfy the given
predicate, with the key serde , value serde , and the underlying
materialized state storage configured in the Materialized instance. |
KTable<K,V> |
filterNot(Predicate<? super K,? super V> predicate)
Create a new
KTable that consists all records of this KTable which do not satisfy the
given predicate, with default serializers, deserializers, and state store. |
KTable<K,V> |
filterNot(Predicate<? super K,? super V> predicate,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a new
KTable that consists all records of this KTable which do not satisfy the
given predicate, with the key serde , value serde , and the underlying
materialized state storage configured in the Materialized instance. |
<K1,V1> KGroupedTable<K1,V1> |
groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector)
Re-groups the records of this
KTable using the provided KeyValueMapper and default serializers
and deserializers. |
<K1,V1> KGroupedTable<K1,V1> |
groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector,
Grouped<K1,V1> grouped)
Re-groups the records of this
KTable using the provided KeyValueMapper
and Serde s as specified by Grouped . |
<K1,V1> KGroupedTable<K1,V1> |
groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector,
Serialized<K1,V1> serialized)
Deprecated.
|
<V1,R> KTable<K,R> |
join(KTable<K,V1> other,
ValueJoiner<? super V,? super V1,? extends R> joiner)
Join records of this
KTable with another KTable 's records using non-windowed inner equi join,
with default serializers, deserializers, and state store. |
<VO,VR> KTable<K,VR> |
join(KTable<K,VO> other,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of this
KTable with another KTable 's records using non-windowed inner equi join,
with the Materialized instance for configuration of the key serde ,
the result table's value serde , and state store . |
<V1,R> KTable<K,R> |
leftJoin(KTable<K,V1> other,
ValueJoiner<? super V,? super V1,? extends R> joiner)
Join records of this
KTable (left input) with another KTable 's (right input) records using
non-windowed left equi join, with default serializers, deserializers, and state store. |
<VO,VR> KTable<K,VR> |
leftJoin(KTable<K,VO> other,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of this
KTable (left input) with another KTable 's (right input) records using
non-windowed left equi join, with the Materialized instance for configuration of the key serde ,
the result table's value serde , and state store . |
<VR> KTable<K,VR> |
mapValues(ValueMapper<? super V,? extends VR> mapper)
Create a new
KTable by transforming the value of each record in this KTable into a new value
(with possibly a new type) in the new KTable , with default serializers, deserializers, and state store. |
<VR> KTable<K,VR> |
mapValues(ValueMapper<? super V,? extends VR> mapper,
Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a new
KTable by transforming the value of each record in this KTable into a new value
(with possibly a new type) in the new KTable , with the key serde , value serde ,
and the underlying materialized state storage configured in the Materialized
instance. |
<VR> KTable<K,VR> |
mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
Create a new
KTable by transforming the value of each record in this KTable into a new value
(with possibly a new type) in the new KTable , with default serializers, deserializers, and state store. |
<VR> KTable<K,VR> |
mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper,
Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a new
KTable by transforming the value of each record in this KTable into a new value
(with possibly a new type) in the new KTable , with the key serde , value serde ,
and the underlying materialized state storage configured in the Materialized
instance. |
<V1,R> KTable<K,R> |
outerJoin(KTable<K,V1> other,
ValueJoiner<? super V,? super V1,? extends R> joiner)
Join records of this
KTable (left input) with another KTable 's (right input) records using
non-windowed outer equi join, with default serializers, deserializers, and state store. |
<VO,VR> KTable<K,VR> |
outerJoin(KTable<K,VO> other,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of this
KTable (left input) with another KTable 's (right input) records using
non-windowed outer equi join, with the Materialized instance for configuration of the key serde ,
the result table's value serde , and state store . |
java.lang.String |
queryableStoreName()
Get the name of the local state store used that can be used to query this
KTable . |
KTable<K,V> |
suppress(Suppressed<? super K> suppressed)
Suppress some updates from this changelog stream, determined by the supplied
Suppressed configuration. |
KStream<K,V> |
toStream()
Convert this changelog stream to a
KStream . |
<K1> KStream<K1,V> |
toStream(KeyValueMapper<? super K,? super V,? extends K1> mapper)
Convert this changelog stream to a
KStream using the given KeyValueMapper to select the new key. |
<VR> KTable<K,VR> |
transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier,
Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized,
java.lang.String... stateStoreNames)
Create a new
KTable by transforming the value of each record in this KTable into a new value
(with possibly a new type), with the key serde , value serde , and the underlying
materialized state storage configured in the Materialized instance. |
<VR> KTable<K,VR> |
transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier,
java.lang.String... stateStoreNames)
Create a new
KTable by transforming the value of each record in this KTable into a new value
(with possibly a new type), with default serializers, deserializers, and state store. |
internalTopologyBuilder, keySerde, valueSerde
public KTableImpl(java.lang.String name, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valSerde, java.util.Set<java.lang.String> sourceNodes, java.lang.String queryableStoreName, boolean isQueryable, ProcessorSupplier<?,?> processorSupplier, StreamsGraphNode streamsGraphNode, InternalStreamsBuilder builder)
public java.lang.String queryableStoreName()
KTable
KTable
.queryableStoreName
in interface KTable<K,V>
null
if this KTable
cannot be queried.public KTable<K,V> filter(Predicate<? super K,? super V> predicate)
KTable
KTable
that consists of all records of this KTable
which satisfy the given
predicate, with default serializers, deserializers, and state store.
All records that do not satisfy the predicate are dropped.
For each KTable
update, the filter is evaluated based on the current update
record and then an update record is produced for the result KTable
.
This is a stateless record-by-record operation.
Note that filter
for a changelog stream works differently than record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
directly if required (i.e., if there is anything to be deleted).
Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
is forwarded.
public KTable<K,V> filter(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
KTable
that consists of all records of this KTable
which satisfy the given
predicate, with the key serde
, value serde
, and the underlying
materialized state storage
configured in the Materialized
instance.
All records that do not satisfy the predicate are dropped.
For each KTable
update, the filter is evaluated based on the current update
record and then an update record is produced for the result KTable
.
This is a stateless record-by-record operation.
Note that filter
for a changelog stream works differently than record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
directly if required (i.e., if there is anything to be deleted).
Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
is forwarded.
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ... // filtering words
ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
K key = "some-word";
V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
The store name to query with is specified by Materialized.as(String)
or Materialized.as(KeyValueBytesStoreSupplier)
.
filter
in interface KTable<K,V>
predicate
- a filter Predicate
that is applied to each recordmaterialized
- a Materialized
that describes how the StateStore
for the resulting KTable
should be materialized. Cannot be null
KTable
that contains only those records that satisfy the given predicateKTable.filterNot(Predicate, Materialized)
public KTable<K,V> filterNot(Predicate<? super K,? super V> predicate)
KTable
KTable
that consists all records of this KTable
which do not satisfy the
given predicate, with default serializers, deserializers, and state store.
All records that do satisfy the predicate are dropped.
For each KTable
update, the filter is evaluated based on the current update
record and then an update record is produced for the result KTable
.
This is a stateless record-by-record operation.
Note that filterNot
for a changelog stream works differently than record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
directly if required (i.e., if there is anything to be deleted).
Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
forwarded.
public KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
KTable
that consists all records of this KTable
which do not satisfy the
given predicate, with the key serde
, value serde
, and the underlying
materialized state storage
configured in the Materialized
instance.
All records that do satisfy the predicate are dropped.
For each KTable
update, the filter is evaluated based on the current update
record and then an update record is produced for the result KTable
.
This is a stateless record-by-record operation.
Note that filterNot
for a changelog stream works differently than record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
directly if required (i.e., if there is anything to be deleted).
Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
forwarded.
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ... // filtering words
ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
K key = "some-word";
V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
The store name to query with is specified by Materialized.as(String)
or Materialized.as(KeyValueBytesStoreSupplier)
.
filterNot
in interface KTable<K,V>
predicate
- a filter Predicate
that is applied to each recordmaterialized
- a Materialized
that describes how the StateStore
for the resulting KTable
should be materialized. Cannot be null
KTable
that contains only those records that do not satisfy the given predicateKTable.filter(Predicate, Materialized)
public <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
KTable
KTable
by transforming the value of each record in this KTable
into a new value
(with possibly a new type) in the new KTable
, with default serializers, deserializers, and state store.
For each KTable
update the provided ValueMapper
is applied to the value of the updated record and
computes a new value for it, resulting in an updated record for the result KTable
.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation.
The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic");
KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
Integer apply(String value) {
return value.split(" ").length;
}
});
This operation preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like a join) is applied to
the result KTable
.
Note that mapValues
for a changelog stream works differently than record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
delete the corresponding record in the result KTable
.
mapValues
in interface KTable<K,V>
VR
- the value type of the result KTable
mapper
- a ValueMapper
that computes a new output valueKTable
that contains records with unmodified keys and new values (possibly of different type)public <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
KTable
KTable
by transforming the value of each record in this KTable
into a new value
(with possibly a new type) in the new KTable
, with default serializers, deserializers, and state store.
For each KTable
update the provided ValueMapperWithKey
is applied to the value of the update
record and computes a new value for it, resulting in an updated record for the result KTable
.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation.
The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic");
KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
Integer apply(String readOnlyKey, String value) {
return readOnlyKey.split(" ").length + value.split(" ").length;
}
});
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
This operation preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like a join) is applied to
the result KTable
.
Note that mapValues
for a changelog stream works differently than record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
delete the corresponding record in the result KTable
.
mapValues
in interface KTable<K,V>
VR
- the value type of the result KTable
mapper
- a ValueMapperWithKey
that computes a new output valueKTable
that contains records with unmodified keys and new values (possibly of different type)public <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
KTable
by transforming the value of each record in this KTable
into a new value
(with possibly a new type) in the new KTable
, with the key serde
, value serde
,
and the underlying materialized state storage
configured in the Materialized
instance.
For each KTable
update the provided ValueMapper
is applied to the value of the updated record and
computes a new value for it, resulting in an updated record for the result KTable
.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation.
The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic");
KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
Integer apply(String value) {
return value.split(" ").length;
}
});
To query the local KeyValueStore
representing outputTable above it must be obtained via
KafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
The store name to query with is specified by Materialized.as(String)
or Materialized.as(KeyValueBytesStoreSupplier)
.
This operation preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like a join) is applied to
the result KTable
.
Note that mapValues
for a changelog stream works differently than record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
delete the corresponding record in the result KTable
.
mapValues
in interface KTable<K,V>
VR
- the value type of the result KTable
mapper
- a ValueMapper
that computes a new output valuematerialized
- a Materialized
that describes how the StateStore
for the resulting KTable
should be materialized. Cannot be null
KTable
that contains records with unmodified keys and new values (possibly of different type)public <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
KTable
by transforming the value of each record in this KTable
into a new value
(with possibly a new type) in the new KTable
, with the key serde
, value serde
,
and the underlying materialized state storage
configured in the Materialized
instance.
For each KTable
update the provided ValueMapperWithKey
is applied to the value of the update
record and computes a new value for it, resulting in an updated record for the result KTable
.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation.
The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic");
KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
Integer apply(String readOnlyKey, String value) {
return readOnlyKey.split(" ").length + value.split(" ").length;
}
});
To query the local KeyValueStore
representing outputTable above it must be obtained via
KafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
The store name to query with is specified by Materialized.as(String)
or Materialized.as(KeyValueBytesStoreSupplier)
.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
This operation preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like a join) is applied to
the result KTable
.
Note that mapValues
for a changelog stream works differently than record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
delete the corresponding record in the result KTable
.
mapValues
in interface KTable<K,V>
VR
- the value type of the result KTable
mapper
- a ValueMapperWithKey
that computes a new output valuematerialized
- a Materialized
that describes how the StateStore
for the resulting KTable
should be materialized. Cannot be null
KTable
that contains records with unmodified keys and new values (possibly of different type)public <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, java.lang.String... stateStoreNames)
KTable
KTable
by transforming the value of each record in this KTable
into a new value
(with possibly a new type), with default serializers, deserializers, and state store.
A ValueTransformerWithKey
(provided by the given ValueTransformerWithKeySupplier
) is applied to each input
record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is similar to KTable.mapValues(ValueMapperWithKey)
, but more flexible, allowing access to additional state-stores,
and access to the ProcessorContext
.
Furthermore, via Punctuator.punctuate(long)
the processing progress can be observed and additional
periodic actions can be performed.
If the downstream topology uses aggregation functions, (e.g. KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
, KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
, etc),
care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results.
In contrast, if the resulting KTable is materialized, (cf. KTable.transformValues(ValueTransformerWithKeySupplier, Materialized, String...)
),
such concerns are handled for you.
In order to assign a state, the state must be created and registered beforehand:
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// register store
builder.addStateStore(keyValueStoreBuilder);
KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
Within the ValueTransformerWithKey
, the state is obtained via the
ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
new ValueTransformerWithKeySupplier() {
ValueTransformerWithKey get() {
return new ValueTransformerWithKey() {
private KeyValueStore<String, String> state;
void init(ProcessorContext context) {
this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
}
NewValueType transform(K readOnlyKey, V value) {
// can access this.state and use read-only key
return new NewValueType(readOnlyKey); // or null
}
void close() {
// can access this.state
}
}
}
}
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
transformValues
in interface KTable<K,V>
VR
- the value type of the result tabletransformerSupplier
- a instance of ValueTransformerWithKeySupplier
that generates a
ValueTransformerWithKey
.
At least one transformer instance will be created per streaming task.
Transformers do not need to be thread-safe.stateStoreNames
- the names of the state stores used by the processorKTable
that contains records with unmodified key and new values (possibly of different type)KTable.mapValues(ValueMapper)
,
KTable.mapValues(ValueMapperWithKey)
public <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, java.lang.String... stateStoreNames)
KTable
KTable
by transforming the value of each record in this KTable
into a new value
(with possibly a new type), with the key serde
, value serde
, and the underlying
materialized state storage
configured in the Materialized
instance.
A ValueTransformerWithKey
(provided by the given ValueTransformerWithKeySupplier
) is applied to each input
record value and computes a new value for it.
This is similar to KTable.mapValues(ValueMapperWithKey)
, but more flexible, allowing stateful, rather than stateless,
record-by-record operation, access to additional state-stores, and access to the ProcessorContext
.
Furthermore, via Punctuator.punctuate(long)
the processing progress can be observed and additional
periodic actions can be performed.
The resulting KTable
is materialized into another state store (additional to the provided state store names)
as specified by the user via Materialized
parameter, and is queryable through its given name.
In order to assign a state, the state must be created and registered beforehand:
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// register store
builder.addStateStore(keyValueStoreBuilder);
KTable outputTable = inputTable.transformValues(
new ValueTransformerWithKeySupplier() { ... },
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()),
"myValueTransformState");
Within the ValueTransformerWithKey
, the state is obtained via the
ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
new ValueTransformerWithKeySupplier() {
ValueTransformerWithKey get() {
return new ValueTransformerWithKey() {
private KeyValueStore<String, String> state;
void init(ProcessorContext context) {
this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
}
NewValueType transform(K readOnlyKey, V value) {
// can access this.state and use read-only key
return new NewValueType(readOnlyKey); // or null
}
void close() {
// can access this.state
}
}
}
}
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
transformValues
in interface KTable<K,V>
VR
- the value type of the result tabletransformerSupplier
- a instance of ValueTransformerWithKeySupplier
that generates a
ValueTransformerWithKey
.
At least one transformer instance will be created per streaming task.
Transformers do not need to be thread-safe.materialized
- an instance of Materialized
used to describe how the state store of the
resulting table should be materialized.
Cannot be null
stateStoreNames
- the names of the state stores used by the processorKTable
that contains records with unmodified key and new values (possibly of different type)KTable.mapValues(ValueMapper)
,
KTable.mapValues(ValueMapperWithKey)
public <K1> KStream<K1,V> toStream(KeyValueMapper<? super K,? super V,? extends K1> mapper)
KTable
KStream
using the given KeyValueMapper
to select the new key.
For example, you can compute the new key as the length of the value string.
KTable<String, String> table = builder.table("topic");
KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
Integer apply(String key, String value) {
return value.length();
}
});
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
join) is applied to the result KStream
.
This operation is equivalent to calling
table.
toStream
().
selectKey(KeyValueMapper)
.
Note that KTable.toStream()
is a logical operation and only changes the "interpretation" of the stream, i.e.,
each record of this changelog stream is no longer treated as an updated record (cf. KStream
vs KTable
).
public KTable<K,V> suppress(Suppressed<? super K> suppressed)
KTable
Suppressed
configuration.
This controls what updates downstream table and stream operations will receive.public <V1,R> KTable<K,R> join(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
KTable
KTable
with another KTable
's records using non-windowed inner equi join,
with default serializers, deserializers, and state store.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
directly to delete a record in the result KTable
if required (i.e., if there is anything to be deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result updated record |
---|---|---|---|---|
<K1:A> | <K1:A> | |||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:C> | <K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> | |
<K1:C> | <K1:null> | <K1:null> |
join
in interface KTable<K,V>
V1
- the value type of the other KTable
R
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same keyKTable.leftJoin(KTable, ValueJoiner)
,
KTable.outerJoin(KTable, ValueJoiner)
public <VO,VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
KTable
with another KTable
's records using non-windowed inner equi join,
with the Materialized
instance for configuration of the key serde
,
the result table's value serde
, and state store
.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
directly to delete a record in the result KTable
if required (i.e., if there is anything to be deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result updated record |
---|---|---|---|---|
<K1:A> | <K1:A> | |||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:C> | <K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> | |
<K1:C> | <K1:null> | <K1:null> |
join
in interface KTable<K,V>
VO
- the value type of the other KTable
VR
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance of Materialized
used to describe how the state store should be materialized.
Cannot be null
KTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same keyKTable.leftJoin(KTable, ValueJoiner, Materialized)
,
KTable.outerJoin(KTable, ValueJoiner, Materialized)
public <V1,R> KTable<K,R> outerJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
KTable
KTable
(left input) with another KTable
's (right input) records using
non-windowed outer equi join, with default serializers, deserializers, and state store.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
In contrast to inner-join
or left-join
,
all records from both input KTable
s will produce an output record (cf. below).
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
's state the
provided ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
Additionally, for each record that does not find a corresponding record in the corresponding other
KTable
's state the provided ValueJoiner
will be called with null
value for the
corresponding other value to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
to delete a record in the result KTable
if required (i.e., if there is anything to be deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result updated record |
---|---|---|---|---|
<K1:A> | <K1:A> | <K1:ValueJoiner(A,null)> | ||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:null> | <K1:b> | <K1:ValueJoiner(null,b)> | ||
<K1:null> | <K1:null> |
outerJoin
in interface KTable<K,V>
V1
- the value type of the other KTable
R
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
both KTable
sKTable.join(KTable, ValueJoiner)
,
KTable.leftJoin(KTable, ValueJoiner)
public <VO,VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
KTable
(left input) with another KTable
's (right input) records using
non-windowed outer equi join, with the Materialized
instance for configuration of the key serde
,
the result table's value serde
, and state store
.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
In contrast to inner-join
or left-join
,
all records from both input KTable
s will produce an output record (cf. below).
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
's state the
provided ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
Additionally, for each record that does not find a corresponding record in the corresponding other
KTable
's state the provided ValueJoiner
will be called with null
value for the
corresponding other value to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
to delete a record in the result KTable
if required (i.e., if there is anything to be deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result updated record |
---|---|---|---|---|
<K1:A> | <K1:A> | <K1:ValueJoiner(A,null)> | ||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:null> | <K1:b> | <K1:ValueJoiner(null,b)> | ||
<K1:null> | <K1:null> |
outerJoin
in interface KTable<K,V>
VO
- the value type of the other KTable
VR
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance of Materialized
used to describe how the state store should be materialized.
Cannot be null
KTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
both KTable
sKTable.join(KTable, ValueJoiner)
,
KTable.leftJoin(KTable, ValueJoiner)
public <V1,R> KTable<K,R> leftJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
KTable
KTable
(left input) with another KTable
's (right input) records using
non-windowed left equi join, with default serializers, deserializers, and state store.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
In contrast to inner-join
, all records from left KTable
will produce
an output record (cf. below).
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
's state the
provided ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
Additionally, for each record of left KTable
that does not find a corresponding record in the
right KTable
's state the provided ValueJoiner
will be called with rightValue =
null
to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
forwarded directly to delete a record in the result KTable
if required (i.e., if there is anything to be
deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result updated record |
---|---|---|---|---|
<K1:A> | <K1:A> | <K1:ValueJoiner(A,null)> | ||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:null> | <K1:b> | <K1:null> | ||
<K1:null> |
leftJoin
in interface KTable<K,V>
V1
- the value type of the other KTable
R
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
left KTable
KTable.join(KTable, ValueJoiner)
,
KTable.outerJoin(KTable, ValueJoiner)
public <VO,VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
KTable
(left input) with another KTable
's (right input) records using
non-windowed left equi join, with the Materialized
instance for configuration of the key serde
,
the result table's value serde
, and state store
.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
In contrast to inner-join
, all records from left KTable
will produce
an output record (cf. below).
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
's state the
provided ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
Additionally, for each record of left KTable
that does not find a corresponding record in the
right KTable
's state the provided ValueJoiner
will be called with rightValue =
null
to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
forwarded directly to delete a record in the result KTable
if required (i.e., if there is anything to be
deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result updated record |
---|---|---|---|---|
<K1:A> | <K1:A> | <K1:ValueJoiner(A,null)> | ||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:null> | <K1:b> | <K1:null> | ||
<K1:null> |
leftJoin
in interface KTable<K,V>
VO
- the value type of the other KTable
VR
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance of Materialized
used to describe how the state store should be materialized.
Cannot be null
KTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
left KTable
KTable.join(KTable, ValueJoiner, Materialized)
,
KTable.outerJoin(KTable, ValueJoiner, Materialized)
public <K1,V1> KGroupedTable<K1,V1> groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector)
KTable
KTable
using the provided KeyValueMapper
and default serializers
and deserializers.
Each KeyValue
pair of this KTable
is mapped to a new KeyValue
pair by applying the
provided KeyValueMapper
.
Re-grouping a KTable
is required before an aggregation operator can be applied to the data
(cf. KGroupedTable
).
The KeyValueMapper
selects a new key and value (with should both have unmodified type).
If the new record key is null
the record will not be included in the resulting KGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
, "<name>" is
an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
All data of this KTable
will be redistributed through the repartitioning topic by writing all update
records to and rereading all updated records from it, such that the resulting KGroupedTable
is partitioned
on the new key.
If the key or value type is changed, it is recommended to use KTable.groupBy(KeyValueMapper, Serialized)
instead.
groupBy
in interface KTable<K,V>
K1
- the key type of the result KGroupedTable
V1
- the value type of the result KGroupedTable
selector
- a KeyValueMapper
that computes a new grouping key and value to be aggregatedKGroupedTable
that contains the re-grouped records of the original KTable
@Deprecated public <K1,V1> KGroupedTable<K1,V1> groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector, Serialized<K1,V1> serialized)
KTable
KTable
using the provided KeyValueMapper
and Serde
s as specified by Serialized
.
Each KeyValue
pair of this KTable
is mapped to a new KeyValue
pair by applying the
provided KeyValueMapper
.
Re-grouping a KTable
is required before an aggregation operator can be applied to the data
(cf. KGroupedTable
).
The KeyValueMapper
selects a new key and value (with both maybe being the same type or a new type).
If the new record key is null
the record will not be included in the resulting KGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
, "<name>" is
an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
All data of this KTable
will be redistributed through the repartitioning topic by writing all update
records to and rereading all updated records from it, such that the resulting KGroupedTable
is partitioned
on the new key.
groupBy
in interface KTable<K,V>
K1
- the key type of the result KGroupedTable
V1
- the value type of the result KGroupedTable
selector
- a KeyValueMapper
that computes a new grouping key and value to be aggregatedserialized
- the Serialized
instance used to specify Serdes
KGroupedTable
that contains the re-grouped records of the original KTable
public <K1,V1> KGroupedTable<K1,V1> groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector, Grouped<K1,V1> grouped)
KTable
KTable
using the provided KeyValueMapper
and Serde
s as specified by Grouped
.
Each KeyValue
pair of this KTable
is mapped to a new KeyValue
pair by applying the
provided KeyValueMapper
.
Re-grouping a KTable
is required before an aggregation operator can be applied to the data
(cf. KGroupedTable
).
The KeyValueMapper
selects a new key and value (where both could the same type or a new type).
If the new record key is null
the record will not be included in the resulting KGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
, "<name>" is
either provided via Grouped.as(String)
or an internally generated name.
You can retrieve all generated internal topic names via Topology.describe()
.
All data of this KTable
will be redistributed through the repartitioning topic by writing all update
records to and rereading all updated records from it, such that the resulting KGroupedTable
is partitioned
on the new key.
groupBy
in interface KTable<K,V>
K1
- the key type of the result KGroupedTable
V1
- the value type of the result KGroupedTable
selector
- a KeyValueMapper
that computes a new grouping key and value to be aggregatedgrouped
- the Grouped
instance used to specify Serdes
and the name for a repartition topic if repartitioning is required.KGroupedTable
that contains the re-grouped records of the original KTable