Package org.apache.kafka.streams.state
Class Stores
- java.lang.Object
-
- org.apache.kafka.streams.state.Stores
-
@Evolving public class Stores extends java.lang.Object
Factory for creating state stores in Kafka Streams.When using the high-level DSL, i.e.,
StreamsBuilder
, users createStoreSupplier
s that can be further customized viaMaterialized
. For example, a topic read asKTable
can be materialized into an in-memory store with custom key/value serdes and caching disabled:
When using the Processor API, i.e.,StreamsBuilder builder = new StreamsBuilder(); KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("queryable-store-name"); KTable<Long,String> table = builder.table( "topicName", Materialized.<Long,String>as(storeSupplier) .withKeySerde(Serdes.Long()) .withValueSerde(Serdes.String()) .withCachingDisabled());
Topology
, users createStoreBuilder
s that can be attached toProcessor
s. For example, you can create awindowed
RocksDB store with custom changelog topic configuration like:Topology topology = new Topology(); topology.addProcessor("processorName", ...); Map<String,String> topicConfig = new HashMap<>(); StoreBuilder<WindowStore<Integer, Long>> storeBuilder = Stores .windowStoreBuilder( Stores.persistentWindowStore("queryable-store-name", ...), Serdes.Integer(), Serdes.Long()) .withLoggingEnabled(topicConfig); topology.addStateStore(storeBuilder, "processorName");
-
-
Constructor Summary
Constructors Constructor Description Stores()
-
Method Summary
All Methods Static Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static KeyValueBytesStoreSupplier
inMemoryKeyValueStore(java.lang.String name)
Create an in-memoryKeyValueBytesStoreSupplier
.static <K,V>
StoreBuilder<KeyValueStore<K,V>>keyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
Creates aStoreBuilder
than can be used to build aKeyValueStore
.static KeyValueBytesStoreSupplier
lruMap(java.lang.String name, int maxCacheSize)
Create a LRU MapKeyValueBytesStoreSupplier
.static KeyValueBytesStoreSupplier
persistentKeyValueStore(java.lang.String name)
Create a persistentKeyValueBytesStoreSupplier
.static SessionBytesStoreSupplier
persistentSessionStore(java.lang.String name, long retentionPeriod)
Deprecated.since 2.1 UsepersistentSessionStore(String, Duration)
insteadstatic SessionBytesStoreSupplier
persistentSessionStore(java.lang.String name, java.time.Duration retentionPeriod)
Create a persistentSessionBytesStoreSupplier
.static WindowBytesStoreSupplier
persistentWindowStore(java.lang.String name, long retentionPeriod, int numSegments, long windowSize, boolean retainDuplicates)
Deprecated.since 2.1 UsepersistentWindowStore(String, Duration, Duration, boolean)
insteadstatic WindowBytesStoreSupplier
persistentWindowStore(java.lang.String name, java.time.Duration retentionPeriod, java.time.Duration windowSize, boolean retainDuplicates)
Create a persistentWindowBytesStoreSupplier
.static <K,V>
StoreBuilder<SessionStore<K,V>>sessionStoreBuilder(SessionBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
Creates aStoreBuilder
that can be used to build aSessionStore
.static <K,V>
StoreBuilder<WindowStore<K,V>>windowStoreBuilder(WindowBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
Creates aStoreBuilder
that can be used to build aWindowStore
.
-
-
-
Method Detail
-
persistentKeyValueStore
public static KeyValueBytesStoreSupplier persistentKeyValueStore(java.lang.String name)
Create a persistentKeyValueBytesStoreSupplier
.- Parameters:
name
- name of the store (cannot benull
)- Returns:
- an instance of a
KeyValueBytesStoreSupplier
that can be used to build a persistent store
-
inMemoryKeyValueStore
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(java.lang.String name)
Create an in-memoryKeyValueBytesStoreSupplier
.- Parameters:
name
- name of the store (cannot benull
)- Returns:
- an instance of a
KeyValueBytesStoreSupplier
than can be used to build an in-memory store
-
lruMap
public static KeyValueBytesStoreSupplier lruMap(java.lang.String name, int maxCacheSize)
Create a LRU MapKeyValueBytesStoreSupplier
.- Parameters:
name
- name of the store (cannot benull
)maxCacheSize
- maximum number of items in the LRU (cannot be negative)- Returns:
- an instance of a
KeyValueBytesStoreSupplier
that can be used to build an LRU Map based store
-
persistentWindowStore
@Deprecated public static WindowBytesStoreSupplier persistentWindowStore(java.lang.String name, long retentionPeriod, int numSegments, long windowSize, boolean retainDuplicates)
Deprecated.since 2.1 UsepersistentWindowStore(String, Duration, Duration, boolean)
insteadCreate a persistentWindowBytesStoreSupplier
.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length of time to retain data in the store (cannot be negative). Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.numSegments
- number of db segments (cannot be zero or negative)windowSize
- size of the windows that are stored (cannot be negative). Note: the window size is not stored with the records, so this value is used to compute the keys that the store returns. No effort is made to validate this parameter, so you must be careful to set it the same as the windowed keys you're actually storing.retainDuplicates
- whether or not to retain duplicates.- Returns:
- an instance of
WindowBytesStoreSupplier
-
persistentWindowStore
public static WindowBytesStoreSupplier persistentWindowStore(java.lang.String name, java.time.Duration retentionPeriod, java.time.Duration windowSize, boolean retainDuplicates) throws java.lang.IllegalArgumentException
Create a persistentWindowBytesStoreSupplier
.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length of time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.windowSize
- size of the windows (cannot be negative)retainDuplicates
- whether or not to retain duplicates.- Returns:
- an instance of
WindowBytesStoreSupplier
- Throws:
java.lang.IllegalArgumentException
- ifretentionPeriod
orwindowSize
can't be represented aslong milliseconds
-
persistentSessionStore
@Deprecated public static SessionBytesStoreSupplier persistentSessionStore(java.lang.String name, long retentionPeriod)
Deprecated.since 2.1 UsepersistentSessionStore(String, Duration)
insteadCreate a persistentSessionBytesStoreSupplier
.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length ot time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.- Returns:
- an instance of a
SessionBytesStoreSupplier
-
persistentSessionStore
public static SessionBytesStoreSupplier persistentSessionStore(java.lang.String name, java.time.Duration retentionPeriod)
Create a persistentSessionBytesStoreSupplier
.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length ot time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.- Returns:
- an instance of a
SessionBytesStoreSupplier
-
windowStoreBuilder
public static <K,V> StoreBuilder<WindowStore<K,V>> windowStoreBuilder(WindowBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
Creates aStoreBuilder
that can be used to build aWindowStore
.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aWindowBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes is null for put operations, it is treated as delete- Returns:
- an instance of
StoreBuilder
than can build aWindowStore
-
keyValueStoreBuilder
public static <K,V> StoreBuilder<KeyValueStore<K,V>> keyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
Creates aStoreBuilder
than can be used to build aKeyValueStore
.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aKeyValueBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes is null for put operations, it is treated as delete- Returns:
- an instance of a
StoreBuilder
that can build aKeyValueStore
-
sessionStoreBuilder
public static <K,V> StoreBuilder<SessionStore<K,V>> sessionStoreBuilder(SessionBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
Creates aStoreBuilder
that can be used to build aSessionStore
.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aSessionBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes is null for put operations, it is treated as delete- Returns:
- an instance of
StoreBuilder
than can build aSessionStore
-
-