/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.structured;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.structured.SchemaKGroupedStream;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;

public class QueuedSchemaKStream
extends SchemaKStream {
    private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue = new LinkedBlockingQueue<KeyValue<String, GenericRow>>(100);

    private QueuedSchemaKStream(Schema schema, KStream kstream, Field keyField, List<SchemaKStream> sourceSchemaKStreams, SchemaKStream.Type type, FunctionRegistry functionRegistry, Optional<Integer> limit, OutputNode outputNode, SchemaRegistryClient schemaRegistryClient) {
        super(schema, (KStream<String, GenericRow>)kstream, keyField, sourceSchemaKStreams, type, functionRegistry, schemaRegistryClient);
        this.setOutputNode(outputNode);
        kstream.foreach(new QueuePopulator(this.rowQueue, limit));
    }

    QueuedSchemaKStream(SchemaKStream schemaKStream, Optional<Integer> limit) {
        this(schemaKStream.schema, schemaKStream.getKstream(), schemaKStream.keyField, schemaKStream.sourceSchemaKStreams, SchemaKStream.Type.SINK, schemaKStream.functionRegistry, limit, schemaKStream.outputNode(), schemaKStream.schemaRegistryClient);
    }

    public BlockingQueue<KeyValue<String, GenericRow>> getQueue() {
        return this.rowQueue;
    }

    @Override
    public SchemaKStream into(String kafkaTopicName, Serde<GenericRow> topicValueSerDe, Set<Integer> rowkeyIndexes) {
        throw new UnsupportedOperationException();
    }

    @Override
    public SchemaKStream filter(Expression filterExpression) {
        throw new UnsupportedOperationException();
    }

    @Override
    public SchemaKStream select(Schema selectSchema) {
        throw new UnsupportedOperationException();
    }

    @Override
    public SchemaKStream select(List<Pair<String, Expression>> expressions) {
        throw new UnsupportedOperationException();
    }

    @Override
    public SchemaKStream leftJoin(SchemaKTable schemaKTable, Schema joinSchema, Field joinKey, KsqlTopicSerDe joinSerDe, KsqlConfig ksqlConfig) {
        throw new UnsupportedOperationException();
    }

    @Override
    public SchemaKStream selectKey(Field newKeyField, boolean updateRowKey) {
        throw new UnsupportedOperationException();
    }

    @Override
    public SchemaKGroupedStream groupBy(Serde<String> keySerde, Serde<GenericRow> valSerde, List<Expression> groupByExpressions) {
        throw new UnsupportedOperationException();
    }

    @Override
    public Field getKeyField() {
        return super.getKeyField();
    }

    @Override
    public Schema getSchema() {
        return super.getSchema();
    }

    @Override
    public KStream getKstream() {
        return super.getKstream();
    }

    @Override
    public List<SchemaKStream> getSourceSchemaKStreams() {
        return super.getSourceSchemaKStreams();
    }

    protected static class QueuePopulator<K>
    implements ForeachAction<K, GenericRow> {
        private final BlockingQueue<KeyValue<String, GenericRow>> queue;
        private final Optional<Integer> limit;
        private int counter = 0;

        QueuePopulator(BlockingQueue<KeyValue<String, GenericRow>> queue, Optional<Integer> limit) {
            this.queue = queue;
            this.limit = limit;
        }

        public void apply(K key, GenericRow row) {
            try {
                String keyString;
                if (row == null) {
                    return;
                }
                if (this.limit.isPresent()) {
                    ++this.counter;
                    if (this.counter > this.limit.get()) {
                        throw new KsqlException("LIMIT reached for the partition.");
                    }
                }
                if (key instanceof Windowed) {
                    Windowed windowedKey = (Windowed)key;
                    keyString = String.format("%s : %s", windowedKey.key(), windowedKey.window());
                } else {
                    keyString = Objects.toString(key);
                }
                this.queue.put((KeyValue<String, GenericRow>)new KeyValue((Object)keyString, (Object)row));
            }
            catch (InterruptedException exception) {
                throw new KsqlException("InterruptedException while enqueueing:" + key);
            }
        }
    }
}

