Class Topology


  • public class Topology
    extends java.lang.Object
    A logical representation of a ProcessorTopology. A topology is an acyclic graph of sources, processors, and sinks. A source is a node in the graph that consumes one or more Kafka topics and forwards them to its successor nodes. A processor is a node in the graph that receives input records from upstream nodes, processes the records, and optionally forwarding new records to one or all of its downstream nodes. Finally, a sink is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. A Topology allows you to construct an acyclic graph of these nodes, and then passed into a new KafkaStreams instance that will then begin consuming, processing, and producing records.
    • Constructor Summary

      Constructors 
      Constructor Description
      Topology()  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      <K,​V>
      Topology
      addGlobalStore​(StoreBuilder<?> storeBuilder, java.lang.String sourceName, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, java.lang.String topic, java.lang.String processorName, ProcessorSupplier<K,​V> stateUpdateSupplier)
      Adds a global StateStore to the topology.
      <K,​V>
      Topology
      addGlobalStore​(StoreBuilder<?> storeBuilder, java.lang.String sourceName, TimestampExtractor timestampExtractor, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, java.lang.String topic, java.lang.String processorName, ProcessorSupplier<K,​V> stateUpdateSupplier)
      Adds a global StateStore to the topology.
      Topology addProcessor​(java.lang.String name, ProcessorSupplier supplier, java.lang.String... parentNames)
      Add a new processor node that receives and processes records output by one or more parent source or processor node.
      Topology addSink​(java.lang.String name, java.lang.String topic, java.lang.String... parentNames)
      Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
      <K,​V>
      Topology
      addSink​(java.lang.String name, java.lang.String topic, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer, java.lang.String... parentNames)
      Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
      <K,​V>
      Topology
      addSink​(java.lang.String name, java.lang.String topic, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer, StreamPartitioner<? super K,​? super V> partitioner, java.lang.String... parentNames)
      Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
      <K,​V>
      Topology
      addSink​(java.lang.String name, java.lang.String topic, StreamPartitioner<? super K,​? super V> partitioner, java.lang.String... parentNames)
      Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using the supplied partitioner.
      <K,​V>
      Topology
      addSink​(java.lang.String name, TopicNameExtractor<K,​V> topicExtractor, java.lang.String... parentNames)
      Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on topicExtractor.
      <K,​V>
      Topology
      addSink​(java.lang.String name, TopicNameExtractor<K,​V> topicExtractor, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer, java.lang.String... parentNames)
      Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on topicExtractor.
      <K,​V>
      Topology
      addSink​(java.lang.String name, TopicNameExtractor<K,​V> topicExtractor, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer, StreamPartitioner<? super K,​? super V> partitioner, java.lang.String... parentNames)
      Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on topicExtractor.
      <K,​V>
      Topology
      addSink​(java.lang.String name, TopicNameExtractor<K,​V> topicExtractor, StreamPartitioner<? super K,​? super V> partitioner, java.lang.String... parentNames)
      Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on topicExtractor, using the supplied partitioner.
      Topology addSource​(java.lang.String name, java.lang.String... topics)
      Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
      Topology addSource​(java.lang.String name, java.util.regex.Pattern topicPattern)
      Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes.
      Topology addSource​(java.lang.String name, org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer, java.lang.String... topics)
      Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      Topology addSource​(java.lang.String name, org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer, java.util.regex.Pattern topicPattern)
      Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes.
      Topology addSource​(TimestampExtractor timestampExtractor, java.lang.String name, java.lang.String... topics)
      Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
      Topology addSource​(TimestampExtractor timestampExtractor, java.lang.String name, java.util.regex.Pattern topicPattern)
      Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes.
      Topology addSource​(Topology.AutoOffsetReset offsetReset, java.lang.String name, java.lang.String... topics)
      Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
      Topology addSource​(Topology.AutoOffsetReset offsetReset, java.lang.String name, java.util.regex.Pattern topicPattern)
      Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes.
      Topology addSource​(Topology.AutoOffsetReset offsetReset, java.lang.String name, org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer, java.lang.String... topics)
      Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes.
      Topology addSource​(Topology.AutoOffsetReset offsetReset, java.lang.String name, org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer, java.util.regex.Pattern topicPattern)
      Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes.
      Topology addSource​(Topology.AutoOffsetReset offsetReset, java.lang.String name, TimestampExtractor timestampExtractor, org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer, java.lang.String... topics)
      Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      Topology addSource​(Topology.AutoOffsetReset offsetReset, java.lang.String name, TimestampExtractor timestampExtractor, org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer, java.util.regex.Pattern topicPattern)
      Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes.
      Topology addSource​(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, java.lang.String name, java.lang.String... topics)
      Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
      Topology addSource​(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, java.lang.String name, java.util.regex.Pattern topicPattern)
      Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes.
      Topology addStateStore​(StoreBuilder<?> storeBuilder, java.lang.String... processorNames)
      Adds a state store.
      Topology connectProcessorAndStateStores​(java.lang.String processorName, java.lang.String... stateStoreNames)
      Connects the processor and the state stores.
      TopologyDescription describe()
      Returns a description of the specified Topology.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • Topology

        public Topology()
    • Method Detail

      • addSource

        public Topology addSource​(java.lang.String name,
                                  java.util.regex.Pattern topicPattern)
        Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes. The source will use the default key deserializer and default value deserializer specified in the stream configuration. The default TimestampExtractor as specified in the config is used.
        Parameters:
        name - the unique name of the source used to reference this node when adding processor children.
        topicPattern - regular expression pattern to match Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(Topology.AutoOffsetReset offsetReset,
                                  java.lang.String name,
                                  java.lang.String... topics)
        Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. The source will use the default key deserializer and default value deserializer specified in the stream configuration. The default TimestampExtractor as specified in the config is used.
        Parameters:
        offsetReset - the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest
        name - the unique name of the source used to reference this node when adding processor children.
        topics - the name of one or more Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(Topology.AutoOffsetReset offsetReset,
                                  java.lang.String name,
                                  java.util.regex.Pattern topicPattern)
        Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes. The source will use the default key deserializer and default value deserializer specified in the stream configuration. The default TimestampExtractor as specified in the config is used.
        Parameters:
        offsetReset - the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.
        name - the unique name of the source used to reference this node when adding processor children.
        topicPattern - regular expression pattern to match Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(TimestampExtractor timestampExtractor,
                                  java.lang.String name,
                                  java.lang.String... topics)
        Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. The source will use the default key deserializer and default value deserializer specified in the stream configuration.
        Parameters:
        timestampExtractor - the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be used
        name - the unique name of the source used to reference this node when adding processor children.
        topics - the name of one or more Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(TimestampExtractor timestampExtractor,
                                  java.lang.String name,
                                  java.util.regex.Pattern topicPattern)
        Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes. The source will use the default key deserializer and default value deserializer specified in the stream configuration.
        Parameters:
        timestampExtractor - the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be used
        name - the unique name of the source used to reference this node when adding processor children.
        topicPattern - regular expression pattern to match Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(Topology.AutoOffsetReset offsetReset,
                                  TimestampExtractor timestampExtractor,
                                  java.lang.String name,
                                  java.lang.String... topics)
        Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. The source will use the default key deserializer and default value deserializer specified in the stream configuration.
        Parameters:
        offsetReset - the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest
        timestampExtractor - the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be used
        name - the unique name of the source used to reference this node when adding processor children.
        topics - the name of one or more Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(Topology.AutoOffsetReset offsetReset,
                                  TimestampExtractor timestampExtractor,
                                  java.lang.String name,
                                  java.util.regex.Pattern topicPattern)
        Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes. The source will use the default key deserializer and default value deserializer specified in the stream configuration.
        Parameters:
        offsetReset - the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.
        timestampExtractor - the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be used
        name - the unique name of the source used to reference this node when adding processor children.
        topicPattern - regular expression pattern to match Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(java.lang.String name,
                                  org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer,
                                  org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer,
                                  java.lang.String... topics)
        Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The default TimestampExtractor as specified in the config is used.
        Parameters:
        name - the unique name of the source used to reference this node when adding processor children
        keyDeserializer - key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be used
        valueDeserializer - value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be used
        topics - the name of one or more Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(java.lang.String name,
                                  org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer,
                                  org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer,
                                  java.util.regex.Pattern topicPattern)
        Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for topics that share the same key-value data format. The default TimestampExtractor as specified in the config is used.
        Parameters:
        name - the unique name of the source used to reference this node when adding processor children
        keyDeserializer - key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be used
        valueDeserializer - value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be used
        topicPattern - regular expression pattern to match Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by name
      • addSource

        public Topology addSource​(Topology.AutoOffsetReset offsetReset,
                                  java.lang.String name,
                                  org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer,
                                  org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer,
                                  java.lang.String... topics)
        Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The provided de-/serializers will be used for all the specified topics, so care should be taken when specifying topics that share the same key-value data format.
        Parameters:
        offsetReset - the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest
        name - the unique name of the source used to reference this node when adding processor children
        keyDeserializer - key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be used
        valueDeserializer - value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be used
        topics - the name of one or more Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by name
      • addSource

        public Topology addSource​(Topology.AutoOffsetReset offsetReset,
                                  java.lang.String name,
                                  org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer,
                                  org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer,
                                  java.util.regex.Pattern topicPattern)
        Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for topics that share the same key-value data format.
        Parameters:
        offsetReset - the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest
        name - the unique name of the source used to reference this node when adding processor children
        keyDeserializer - key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be used
        valueDeserializer - value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be used
        topicPattern - regular expression pattern to match Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by name
      • addSource

        public Topology addSource​(Topology.AutoOffsetReset offsetReset,
                                  java.lang.String name,
                                  TimestampExtractor timestampExtractor,
                                  org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer,
                                  org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer,
                                  java.lang.String... topics)
        Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers.
        Parameters:
        offsetReset - the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest.
        name - the unique name of the source used to reference this node when adding processor children.
        timestampExtractor - the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be used
        keyDeserializer - key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be used
        valueDeserializer - value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be used
        topics - the name of one or more Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by another source
      • addSource

        public Topology addSource​(Topology.AutoOffsetReset offsetReset,
                                  java.lang.String name,
                                  TimestampExtractor timestampExtractor,
                                  org.apache.kafka.common.serialization.Deserializer<?> keyDeserializer,
                                  org.apache.kafka.common.serialization.Deserializer<?> valueDeserializer,
                                  java.util.regex.Pattern topicPattern)
        Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for topics that share the same key-value data format.
        Parameters:
        offsetReset - the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest
        name - the unique name of the source used to reference this node when adding processor children.
        timestampExtractor - the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be used
        keyDeserializer - key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be used
        valueDeserializer - value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be used
        topicPattern - regular expression pattern to match Kafka topics that this source is to consume
        Returns:
        itself
        Throws:
        TopologyException - if processor is already added or if topics have already been registered by name
      • addSink

        public <K,​V> Topology addSink​(java.lang.String name,
                                            java.lang.String topic,
                                            StreamPartitioner<? super K,​? super V> partitioner,
                                            java.lang.String... parentNames)
        Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using the supplied partitioner. The sink will use the default key serializer and default value serializer specified in the stream configuration.

        The sink will also use the specified StreamPartitioner to determine how records are distributed among the named Kafka topic's partitions. Such control is often useful with topologies that use state stores in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute records among partitions using Kafka's default partitioning logic.

        Parameters:
        name - the unique name of the sink
        topic - the name of the Kafka topic to which this sink should write its records
        partitioner - the function that should be used to determine the partition for each record processed by the sink
        parentNames - the name of one or more source or processor nodes whose output records this sink should consume and write to its topic
        Returns:
        itself
        Throws:
        TopologyException - if parent processor is not added yet, or if this processor's name is equal to the parent's name
        See Also:
        addSink(String, String, String...), addSink(String, String, Serializer, Serializer, String...), addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      • addSink

        public <K,​V> Topology addSink​(java.lang.String name,
                                            java.lang.String topic,
                                            org.apache.kafka.common.serialization.Serializer<K> keySerializer,
                                            org.apache.kafka.common.serialization.Serializer<V> valueSerializer,
                                            StreamPartitioner<? super K,​? super V> partitioner,
                                            java.lang.String... parentNames)
        Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. The sink will use the specified key and value serializers, and the supplied partitioner.
        Parameters:
        name - the unique name of the sink
        topic - the name of the Kafka topic to which this sink should write its records
        keySerializer - the key serializer used when consuming records; may be null if the sink should use the default key serializer specified in the stream configuration
        valueSerializer - the value serializer used when consuming records; may be null if the sink should use the default value serializer specified in the stream configuration
        partitioner - the function that should be used to determine the partition for each record processed by the sink
        parentNames - the name of one or more source or processor nodes whose output records this sink should consume and write to its topic
        Returns:
        itself
        Throws:
        TopologyException - if parent processor is not added yet, or if this processor's name is equal to the parent's name
        See Also:
        addSink(String, String, String...), addSink(String, String, StreamPartitioner, String...), addSink(String, String, Serializer, Serializer, String...)
      • addSink

        public <K,​V> Topology addSink​(java.lang.String name,
                                            TopicNameExtractor<K,​V> topicExtractor,
                                            StreamPartitioner<? super K,​? super V> partitioner,
                                            java.lang.String... parentNames)
        Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on topicExtractor, using the supplied partitioner. The topics that it may ever send to should be pre-created. The sink will use the default key serializer and default value serializer specified in the stream configuration.

        The sink will also use the specified StreamPartitioner to determine how records are distributed among the named Kafka topic's partitions. Such control is often useful with topologies that use state stores in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute records among partitions using Kafka's default partitioning logic.

        Parameters:
        name - the unique name of the sink
        topicExtractor - the extractor to determine the name of the Kafka topic to which this sink should write for each record
        partitioner - the function that should be used to determine the partition for each record processed by the sink
        parentNames - the name of one or more source or processor nodes whose output records this sink should consume and dynamically write to topics
        Returns:
        itself
        Throws:
        TopologyException - if parent processor is not added yet, or if this processor's name is equal to the parent's name
        See Also:
        addSink(String, String, String...), addSink(String, String, Serializer, Serializer, String...), addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      • addSink

        public <K,​V> Topology addSink​(java.lang.String name,
                                            TopicNameExtractor<K,​V> topicExtractor,
                                            org.apache.kafka.common.serialization.Serializer<K> keySerializer,
                                            org.apache.kafka.common.serialization.Serializer<V> valueSerializer,
                                            java.lang.String... parentNames)
        Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on topicExtractor. The topics that it may ever send to should be pre-created. The sink will use the specified key and value serializers.
        Parameters:
        name - the unique name of the sink
        topicExtractor - the extractor to determine the name of the Kafka topic to which this sink should write for each record
        keySerializer - the key serializer used when consuming records; may be null if the sink should use the default key serializer specified in the stream configuration
        valueSerializer - the value serializer used when consuming records; may be null if the sink should use the default value serializer specified in the stream configuration
        parentNames - the name of one or more source or processor nodes whose output records this sink should consume and dynamically write to topics
        Returns:
        itself
        Throws:
        TopologyException - if parent processor is not added yet, or if this processor's name is equal to the parent's name
        See Also:
        addSink(String, String, String...), addSink(String, String, StreamPartitioner, String...), addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      • addSink

        public <K,​V> Topology addSink​(java.lang.String name,
                                            TopicNameExtractor<K,​V> topicExtractor,
                                            org.apache.kafka.common.serialization.Serializer<K> keySerializer,
                                            org.apache.kafka.common.serialization.Serializer<V> valueSerializer,
                                            StreamPartitioner<? super K,​? super V> partitioner,
                                            java.lang.String... parentNames)
        Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on topicExtractor. The topics that it may ever send to should be pre-created. The sink will use the specified key and value serializers, and the supplied partitioner.
        Parameters:
        name - the unique name of the sink
        topicExtractor - the extractor to determine the name of the Kafka topic to which this sink should write for each record
        keySerializer - the key serializer used when consuming records; may be null if the sink should use the default key serializer specified in the stream configuration
        valueSerializer - the value serializer used when consuming records; may be null if the sink should use the default value serializer specified in the stream configuration
        partitioner - the function that should be used to determine the partition for each record processed by the sink
        parentNames - the name of one or more source or processor nodes whose output records this sink should consume and dynamically write to topics
        Returns:
        itself
        Throws:
        TopologyException - if parent processor is not added yet, or if this processor's name is equal to the parent's name
        See Also:
        addSink(String, String, String...), addSink(String, String, StreamPartitioner, String...), addSink(String, String, Serializer, Serializer, String...)
      • addProcessor

        public Topology addProcessor​(java.lang.String name,
                                     ProcessorSupplier supplier,
                                     java.lang.String... parentNames)
        Add a new processor node that receives and processes records output by one or more parent source or processor node. Any new record output by this processor will be forwarded to its child processor or sink nodes. If supplier provides stores via ConnectedStoreProvider.stores(), the provided StoreBuilders will be added to the topology and connected to this processor automatically.
        Parameters:
        name - the unique name of the processor node
        supplier - the supplier used to obtain this node's Processor instance
        parentNames - the name of one or more source or processor nodes whose output records this processor should receive and process
        Returns:
        itself
        Throws:
        TopologyException - if parent processor is not added yet, or if this processor's name is equal to the parent's name
      • addStateStore

        public Topology addStateStore​(StoreBuilder<?> storeBuilder,
                                      java.lang.String... processorNames)
        Adds a state store.
        Parameters:
        storeBuilder - the storeBuilder used to obtain this state store StateStore instance
        processorNames - the names of the processors that should be able to access the provided store
        Returns:
        itself
        Throws:
        TopologyException - if state store supplier is already added
      • addGlobalStore

        public <K,​V> Topology addGlobalStore​(StoreBuilder<?> storeBuilder,
                                                   java.lang.String sourceName,
                                                   org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
                                                   org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer,
                                                   java.lang.String topic,
                                                   java.lang.String processorName,
                                                   ProcessorSupplier<K,​V> stateUpdateSupplier)
        Adds a global StateStore to the topology. The StateStore sources its data from all partitions of the provided input topic. There will be exactly one instance of this StateStore per Kafka Streams instance.

        A SourceNode with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.

        The provided ProcessorSupplier will be used to create an ProcessorNode that will receive all records forwarded from the SourceNode. This ProcessorNode should be used to keep the StateStore up-to-date. The default TimestampExtractor as specified in the config is used.

        Parameters:
        storeBuilder - user defined state store builder
        sourceName - name of the SourceNode that will be automatically added
        keyDeserializer - the Deserializer to deserialize keys with
        valueDeserializer - the Deserializer to deserialize values with
        topic - the topic to source the data from
        processorName - the name of the ProcessorSupplier
        stateUpdateSupplier - the instance of ProcessorSupplier
        Returns:
        itself
        Throws:
        TopologyException - if the processor of state is already registered
      • addGlobalStore

        public <K,​V> Topology addGlobalStore​(StoreBuilder<?> storeBuilder,
                                                   java.lang.String sourceName,
                                                   TimestampExtractor timestampExtractor,
                                                   org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
                                                   org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer,
                                                   java.lang.String topic,
                                                   java.lang.String processorName,
                                                   ProcessorSupplier<K,​V> stateUpdateSupplier)
        Adds a global StateStore to the topology. The StateStore sources its data from all partitions of the provided input topic. There will be exactly one instance of this StateStore per Kafka Streams instance.

        A SourceNode with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.

        The provided ProcessorSupplier will be used to create an ProcessorNode that will receive all records forwarded from the SourceNode. This ProcessorNode should be used to keep the StateStore up-to-date.

        Parameters:
        storeBuilder - user defined key value store builder
        sourceName - name of the SourceNode that will be automatically added
        timestampExtractor - the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be used
        keyDeserializer - the Deserializer to deserialize keys with
        valueDeserializer - the Deserializer to deserialize values with
        topic - the topic to source the data from
        processorName - the name of the ProcessorSupplier
        stateUpdateSupplier - the instance of ProcessorSupplier
        Returns:
        itself
        Throws:
        TopologyException - if the processor of state is already registered
      • connectProcessorAndStateStores

        public Topology connectProcessorAndStateStores​(java.lang.String processorName,
                                                       java.lang.String... stateStoreNames)
        Connects the processor and the state stores.
        Parameters:
        processorName - the name of the processor
        stateStoreNames - the names of state stores that the processor uses
        Returns:
        itself
        Throws:
        TopologyException - if the processor or a state store is unknown
      • describe

        public TopologyDescription describe()
        Returns a description of the specified Topology.
        Returns:
        a description of the topology.