Class Stores


  • @Evolving
    public class Stores
    extends java.lang.Object
    Factory for creating state stores in Kafka Streams.

    When using the high-level DSL, i.e., StreamsBuilder, users create StoreSuppliers that can be further customized via Materialized. For example, a topic read as KTable can be materialized into an in-memory store with custom key/value serdes and caching disabled:

    
     StreamsBuilder builder = new StreamsBuilder();
     KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("queryable-store-name");
     KTable<Long,String> table = builder.table(
       "topicName",
       Materialized.<Long,String>as(storeSupplier)
                   .withKeySerde(Serdes.Long())
                   .withValueSerde(Serdes.String())
                   .withCachingDisabled());
     
    When using the Processor API, i.e., Topology, users create StoreBuilders that can be attached to Processors. For example, you can create a windowed RocksDB store with custom changelog topic configuration like:
    
     Topology topology = new Topology();
     topology.addProcessor("processorName", ...);
    
     Map<String,String> topicConfig = new HashMap<>();
     StoreBuilder<WindowStore<Integer, Long>> storeBuilder = Stores
       .windowStoreBuilder(
         Stores.persistentWindowStore("queryable-store-name", ...),
         Serdes.Integer(),
         Serdes.Long())
       .withLoggingEnabled(topicConfig);
    
     topology.addStateStore(storeBuilder, "processorName");
     
    • Constructor Detail

      • Stores

        public Stores()
    • Method Detail

      • persistentWindowStore

        @Deprecated
        public static WindowBytesStoreSupplier persistentWindowStore​(java.lang.String name,
                                                                     long retentionPeriod,
                                                                     int numSegments,
                                                                     long windowSize,
                                                                     boolean retainDuplicates)
        Create a persistent WindowBytesStoreSupplier.
        Parameters:
        name - name of the store (cannot be null)
        retentionPeriod - length of time to retain data in the store (cannot be negative). Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
        numSegments - number of db segments (cannot be zero or negative)
        windowSize - size of the windows that are stored (cannot be negative). Note: the window size is not stored with the records, so this value is used to compute the keys that the store returns. No effort is made to validate this parameter, so you must be careful to set it the same as the windowed keys you're actually storing.
        retainDuplicates - whether or not to retain duplicates.
        Returns:
        an instance of WindowBytesStoreSupplier
      • persistentWindowStore

        public static WindowBytesStoreSupplier persistentWindowStore​(java.lang.String name,
                                                                     java.time.Duration retentionPeriod,
                                                                     java.time.Duration windowSize,
                                                                     boolean retainDuplicates)
                                                              throws java.lang.IllegalArgumentException
        Create a persistent WindowBytesStoreSupplier.
        Parameters:
        name - name of the store (cannot be null)
        retentionPeriod - length of time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
        windowSize - size of the windows (cannot be negative)
        retainDuplicates - whether or not to retain duplicates.
        Returns:
        an instance of WindowBytesStoreSupplier
        Throws:
        java.lang.IllegalArgumentException - if retentionPeriod or windowSize can't be represented as long milliseconds
      • persistentSessionStore

        @Deprecated
        public static SessionBytesStoreSupplier persistentSessionStore​(java.lang.String name,
                                                                       long retentionPeriod)
        Deprecated.
        Create a persistent SessionBytesStoreSupplier.
        Parameters:
        name - name of the store (cannot be null)
        retentionPeriod - length ot time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
        Returns:
        an instance of a SessionBytesStoreSupplier
      • persistentSessionStore

        public static SessionBytesStoreSupplier persistentSessionStore​(java.lang.String name,
                                                                       java.time.Duration retentionPeriod)
        Create a persistent SessionBytesStoreSupplier.
        Parameters:
        name - name of the store (cannot be null)
        retentionPeriod - length ot time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
        Returns:
        an instance of a SessionBytesStoreSupplier
      • windowStoreBuilder

        public static <K,​V> StoreBuilder<WindowStore<K,​V>> windowStoreBuilder​(WindowBytesStoreSupplier supplier,
                                                                                          org.apache.kafka.common.serialization.Serde<K> keySerde,
                                                                                          org.apache.kafka.common.serialization.Serde<V> valueSerde)
        Creates a StoreBuilder that can be used to build a WindowStore.
        Type Parameters:
        K - key type
        V - value type
        Parameters:
        supplier - a WindowBytesStoreSupplier (cannot be null)
        keySerde - the key serde to use
        valueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as delete
        Returns:
        an instance of StoreBuilder than can build a WindowStore
      • keyValueStoreBuilder

        public static <K,​V> StoreBuilder<KeyValueStore<K,​V>> keyValueStoreBuilder​(KeyValueBytesStoreSupplier supplier,
                                                                                              org.apache.kafka.common.serialization.Serde<K> keySerde,
                                                                                              org.apache.kafka.common.serialization.Serde<V> valueSerde)
        Creates a StoreBuilder than can be used to build a KeyValueStore.
        Type Parameters:
        K - key type
        V - value type
        Parameters:
        supplier - a KeyValueBytesStoreSupplier (cannot be null)
        keySerde - the key serde to use
        valueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as delete
        Returns:
        an instance of a StoreBuilder that can build a KeyValueStore
      • sessionStoreBuilder

        public static <K,​V> StoreBuilder<SessionStore<K,​V>> sessionStoreBuilder​(SessionBytesStoreSupplier supplier,
                                                                                            org.apache.kafka.common.serialization.Serde<K> keySerde,
                                                                                            org.apache.kafka.common.serialization.Serde<V> valueSerde)
        Creates a StoreBuilder that can be used to build a SessionStore.
        Type Parameters:
        K - key type
        V - value type
        Parameters:
        supplier - a SessionBytesStoreSupplier (cannot be null)
        keySerde - the key serde to use
        valueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as delete
        Returns:
        an instance of StoreBuilder than can build a SessionStore