package io.confluent.ksql.structured;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;

/* loaded from: input_file:io/confluent/ksql/structured/QueuedSchemaKStream.class */
public class QueuedSchemaKStream extends SchemaKStream {
    private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue;

    /* loaded from: input_file:io/confluent/ksql/structured/QueuedSchemaKStream$QueuePopulator.class */
    protected static class QueuePopulator<K> implements ForeachAction<K, GenericRow> {
        private final BlockingQueue<KeyValue<String, GenericRow>> queue;
        private final Optional<Integer> limit;
        private int counter = 0;

        QueuePopulator(BlockingQueue<KeyValue<String, GenericRow>> blockingQueue, Optional<Integer> optional) {
            this.queue = blockingQueue;
            this.limit = optional;
        }

        public void apply(K k, GenericRow genericRow) {
            String objects;
            if (genericRow == null) {
                return;
            }
            try {
                if (this.limit.isPresent()) {
                    this.counter++;
                    if (this.counter > this.limit.get().intValue()) {
                        throw new KsqlException("LIMIT reached for the partition.");
                    }
                }
                if (k instanceof Windowed) {
                    Windowed windowed = (Windowed) k;
                    objects = String.format("%s : %s", windowed.key(), windowed.window());
                } else {
                    objects = Objects.toString(k);
                }
                this.queue.put(new KeyValue<>(objects, genericRow));
            } catch (InterruptedException e) {
                throw new KsqlException("InterruptedException while enqueueing:" + k);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void apply(Object obj, Object obj2) {
            apply((QueuePopulator<K>) obj, (GenericRow) obj2);
        }
    }

    private QueuedSchemaKStream(Schema schema, KStream kStream, Field field, List<SchemaKStream> list, SchemaKStream.Type type, FunctionRegistry functionRegistry, Optional<Integer> optional, OutputNode outputNode, SchemaRegistryClient schemaRegistryClient) {
        super(schema, kStream, field, list, type, functionRegistry, schemaRegistryClient);
        this.rowQueue = new LinkedBlockingQueue(100);
        setOutputNode(outputNode);
        kStream.foreach(new QueuePopulator(this.rowQueue, optional));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueuedSchemaKStream(SchemaKStream schemaKStream, Optional<Integer> optional) {
        this(schemaKStream.schema, schemaKStream.getKstream(), schemaKStream.keyField, schemaKStream.sourceSchemaKStreams, SchemaKStream.Type.SINK, schemaKStream.functionRegistry, optional, schemaKStream.outputNode(), schemaKStream.schemaRegistryClient);
    }

    public BlockingQueue<KeyValue<String, GenericRow>> getQueue() {
        return this.rowQueue;
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public SchemaKStream into(String str, Serde<GenericRow> serde, Set<Integer> set) {
        throw new UnsupportedOperationException();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public SchemaKStream filter(Expression expression) {
        throw new UnsupportedOperationException();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public SchemaKStream select(Schema schema) {
        throw new UnsupportedOperationException();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public SchemaKStream select(List<Pair<String, Expression>> list) {
        throw new UnsupportedOperationException();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public SchemaKStream leftJoin(SchemaKTable schemaKTable, Schema schema, Field field, KsqlTopicSerDe ksqlTopicSerDe, KsqlConfig ksqlConfig) {
        throw new UnsupportedOperationException();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public SchemaKStream selectKey(Field field, boolean z) {
        throw new UnsupportedOperationException();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public SchemaKGroupedStream groupBy(Serde<String> serde, Serde<GenericRow> serde2, List<Expression> list) {
        throw new UnsupportedOperationException();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public Field getKeyField() {
        return super.getKeyField();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public Schema getSchema() {
        return super.getSchema();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public KStream getKstream() {
        return super.getKstream();
    }

    @Override // io.confluent.ksql.structured.SchemaKStream
    public List<SchemaKStream> getSourceSchemaKStreams() {
        return super.getSourceSchemaKStreams();
    }
}
