Class KafkaStreams


  • @Evolving
    public class KafkaStreams
    extends java.lang.Object
    A Kafka client that allows for performing continuous computation on input coming from one or more input topics and sends output to zero, one, or more output topics.

    The computational logic can be specified either by using the Topology to define a DAG topology of Processors or by using the StreamsBuilder which provides the high-level DSL to define transformations.

    One KafkaStreams instance can contain one or more threads specified in the configs for the processing work.

    A KafkaStreams instance can co-ordinate with any other instances with the same application ID (whether in the same process, on other processes on this machine, or on remote machines) as a single (possibly distributed) stream processing application. These instances will divide up the work based on the assignment of the input topic partitions so that all partitions are being consumed. If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves to balance processing load and ensure that all input topic partitions are processed.

    Internally a KafkaStreams instance contains a normal KafkaProducer and KafkaConsumer instance that is used for reading input and writing output.

    A simple example might look like this:

    
     Map<String, Object> props = new HashMap<>();
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
     props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
     StreamsBuilder builder = new StreamsBuilder();
     builder.<String, String>stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
    
     KafkaStreams streams = new KafkaStreams(builder.build(), props);
     streams.start();
     
    See Also:
    StreamsBuilder, Topology
    • Constructor Detail

      • KafkaStreams

        public KafkaStreams​(Topology topology,
                            java.util.Properties props)
        Create a KafkaStreams instance.

        Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

        Parameters:
        topology - the topology specifying the computational logic
        props - properties for StreamsConfig
        Throws:
        StreamsException - if any fatal error occurs
      • KafkaStreams

        public KafkaStreams​(Topology topology,
                            java.util.Properties props,
                            KafkaClientSupplier clientSupplier)
        Create a KafkaStreams instance.

        Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

        Parameters:
        topology - the topology specifying the computational logic
        props - properties for StreamsConfig
        clientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients for the new KafkaStreams instance
        Throws:
        StreamsException - if any fatal error occurs
      • KafkaStreams

        public KafkaStreams​(Topology topology,
                            java.util.Properties props,
                            org.apache.kafka.common.utils.Time time)
        Create a KafkaStreams instance.

        Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

        Parameters:
        topology - the topology specifying the computational logic
        props - properties for StreamsConfig
        time - Time implementation; cannot be null
        Throws:
        StreamsException - if any fatal error occurs
      • KafkaStreams

        public KafkaStreams​(Topology topology,
                            java.util.Properties props,
                            KafkaClientSupplier clientSupplier,
                            org.apache.kafka.common.utils.Time time)
        Create a KafkaStreams instance.

        Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

        Parameters:
        topology - the topology specifying the computational logic
        props - properties for StreamsConfig
        clientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients for the new KafkaStreams instance
        time - Time implementation; cannot be null
        Throws:
        StreamsException - if any fatal error occurs
    • Method Detail

      • setStateListener

        public void setStateListener​(KafkaStreams.StateListener listener)
        An app can set a single KafkaStreams.StateListener so that the app is notified when state changes.
        Parameters:
        listener - a new state listener
        Throws:
        java.lang.IllegalStateException - if this KafkaStreams instance is not in state CREATED.
      • setUncaughtExceptionHandler

        public void setUncaughtExceptionHandler​(java.lang.Thread.UncaughtExceptionHandler eh)
        Set the handler invoked when a internal thread abruptly terminates due to an uncaught exception.
        Parameters:
        eh - the uncaught exception handler for all internal threads; null deletes the current handler
        Throws:
        java.lang.IllegalStateException - if this KafkaStreams instance is not in state CREATED.
      • setGlobalStateRestoreListener

        public void setGlobalStateRestoreListener​(StateRestoreListener globalStateRestoreListener)
        Set the listener which is triggered whenever a StateStore is being restored in order to resume processing.
        Parameters:
        globalStateRestoreListener - The listener triggered when StateStore is being restored.
        Throws:
        java.lang.IllegalStateException - if this KafkaStreams instance is not in state CREATED.
      • metrics

        public java.util.Map<org.apache.kafka.common.MetricName,​? extends org.apache.kafka.common.Metric> metrics()
        Get read-only handle on global metrics registry, including streams client's own metrics plus its embedded consumer clients' metrics.
        Returns:
        Map of all metrics.
      • start

        public void start()
                   throws java.lang.IllegalStateException,
                          StreamsException
        Start the KafkaStreams instance by starting all its threads. This function is expected to be called only once during the life cycle of the client.

        Because threads are started in the background, this method does not block. However, if you have global stores in your topology, this method blocks until all global stores are restored. As a consequence, any fatal exception that happens during processing is by default only logged. If you want to be notified about dying threads, you can register an uncaught exception handler before starting the KafkaStreams instance.

        Note, for brokers with version 0.9.x or lower, the broker version cannot be checked. There will be no error and the client will hang and retry to verify the broker version until it times out.

        Throws:
        java.lang.IllegalStateException - if process was already started
        StreamsException - if the Kafka brokers have version 0.10.0.x or if exactly-once is enabled for pre 0.11.0.x brokers
      • close

        public void close()
        Shutdown this KafkaStreams instance by signaling all the threads to stop, and then wait for them to join. This will block until all threads have stopped.
      • close

        @Deprecated
        public boolean close​(long timeout,
                             java.util.concurrent.TimeUnit timeUnit)
        Deprecated.
        Use close(Duration) instead; note, that close(Duration) has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.
        Shutdown this KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the threads to join. A timeout of 0 means to wait forever.
        Parameters:
        timeout - how long to wait for the threads to shutdown. Can't be negative. If timeout=0 just checking the state and return immediately.
        timeUnit - unit of time used for timeout
        Returns:
        true if all threads were successfully stopped—false if the timeout was reached before all threads stopped Note that this method must not be called in the onChange callback of KafkaStreams.StateListener.
      • close

        public boolean close​(java.time.Duration timeout)
                      throws java.lang.IllegalArgumentException
        Shutdown this KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the threads to join. A timeout of 0 means to wait forever.
        Parameters:
        timeout - how long to wait for the threads to shutdown
        Returns:
        true if all threads were successfully stopped—false if the timeout was reached before all threads stopped Note that this method must not be called in the KafkaStreams.StateListener.onChange(State, State) callback of KafkaStreams.StateListener.
        Throws:
        java.lang.IllegalArgumentException - if timeout can't be represented as long milliseconds
      • allMetadata

        public java.util.Collection<StreamsMetadata> allMetadata()
        Find all currently running KafkaStreams instances (potentially remotely) that use the same application ID as this instance (i.e., all instances that belong to the same Kafka Streams application) and return StreamsMetadata for each discovered instance.

        Note: this is a point in time view and it may change due to partition reassignment.

        Returns:
        StreamsMetadata for each KafkaStreams instances of this application
      • allMetadataForStore

        public java.util.Collection<StreamsMetadata> allMetadataForStore​(java.lang.String storeName)
        Find all currently running KafkaStreams instances (potentially remotely) that
        • use the same application ID as this instance (i.e., all instances that belong to the same Kafka Streams application)
        • and that contain a StateStore with the given storeName
        and return StreamsMetadata for each discovered instance.

        Note: this is a point in time view and it may change due to partition reassignment.

        Parameters:
        storeName - the storeName to find metadata for
        Returns:
        StreamsMetadata for each KafkaStreams instances with the provide storeName of this application
      • metadataForKey

        public <K> StreamsMetadata metadataForKey​(java.lang.String storeName,
                                                  K key,
                                                  org.apache.kafka.common.serialization.Serializer<K> keySerializer)
        Find the currently running KafkaStreams instance (potentially remotely) that
        • use the same application ID as this instance (i.e., all instances that belong to the same Kafka Streams application)
        • and that contain a StateStore with the given storeName
        • and the StateStore contains the given key
        and return StreamsMetadata for it.

        This will use the default Kafka Streams partitioner to locate the partition. If a custom partitioner has been configured via StreamsConfig or KStream.through(String, Produced), or if the original KTable's input topic is partitioned differently, please use metadataForKey(String, Object, StreamPartitioner).

        Note:

        • this is a point in time view and it may change due to partition reassignment
        • the key may not exist in the StateStore; this method provides a way of finding which host it would exist on
        • if this is for a window store the serializer should be the serializer for the record key, not the window serializer
        Type Parameters:
        K - key type
        Parameters:
        storeName - the storeName to find metadata for
        key - the key to find metadata for
        keySerializer - serializer for the key
        Returns:
        StreamsMetadata for the KafkaStreams instance with the provide storeName and key of this application or StreamsMetadata.NOT_AVAILABLE if Kafka Streams is (re-)initializing
      • metadataForKey

        public <K> StreamsMetadata metadataForKey​(java.lang.String storeName,
                                                  K key,
                                                  StreamPartitioner<? super K,​?> partitioner)
        Find the currently running KafkaStreams instance (potentially remotely) that
        • use the same application ID as this instance (i.e., all instances that belong to the same Kafka Streams application)
        • and that contain a StateStore with the given storeName
        • and the StateStore contains the given key
        and return StreamsMetadata for it.

        Note:

        • this is a point in time view and it may change due to partition reassignment
        • the key may not exist in the StateStore; this method provides a way of finding which host it would exist on
        Type Parameters:
        K - key type
        Parameters:
        storeName - the storeName to find metadata for
        key - the key to find metadata for
        partitioner - the partitioner to be use to locate the host for the key
        Returns:
        StreamsMetadata for the KafkaStreams instance with the provide storeName and key of this application or StreamsMetadata.NOT_AVAILABLE if Kafka Streams is (re-)initializing
      • store

        public <T> T store​(java.lang.String storeName,
                           QueryableStoreType<T> queryableStoreType)
        Get a facade wrapping the local StateStore instances with the provided storeName if the Store's type is accepted by the provided queryableStoreType. The returned object can be used to query the StateStore instances.
        Type Parameters:
        T - return type
        Parameters:
        storeName - name of the store to find
        queryableStoreType - accept only stores that are accepted by QueryableStoreType.accepts(StateStore)
        Returns:
        A facade wrapping the local StateStore instances
        Throws:
        InvalidStateStoreException - if Kafka Streams is (re-)initializing or a store with storeName and queryableStoreType doesn't exist
      • localThreadsMetadata

        public java.util.Set<ThreadMetadata> localThreadsMetadata()
        Returns runtime information about the local threads of this KafkaStreams instance.
        Returns:
        the set of ThreadMetadata.