package org.apache.kafka.streams.kstream.internals;

import java.util.Random;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractStreamTest.class */
public class AbstractStreamTest {

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractStreamTest$ExtendedKStream.class */
    private static class ExtendedKStream<K, V> extends AbstractStream<K, V> {
        ExtendedKStream(KStream<K, V> kStream) {
            super((KStreamImpl) kStream);
        }

        KStream<K, V> randomFilter() {
            String newProcessorName = this.builder.newProcessorName("RANDOM-FILTER-");
            ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(newProcessorName, new ProcessorParameters(new ExtendedKStreamDummy(), newProcessorName));
            this.builder.addGraphNode(this.graphNode, processorGraphNode);
            return new KStreamImpl(newProcessorName, (Serde) null, (Serde) null, this.subTopologySourceNodes, false, processorGraphNode, this.builder);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractStreamTest$ExtendedKStreamDummy.class */
    public static class ExtendedKStreamDummy<K, V> implements ProcessorSupplier<K, V, K, V> {
        private final Random rand = new Random();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractStreamTest$ExtendedKStreamDummy$ExtendedKStreamDummyProcessor.class */
        public class ExtendedKStreamDummyProcessor extends ContextualProcessor<K, V, K, V> {
            private ExtendedKStreamDummyProcessor() {
            }

            public void process(Record<K, V> record) {
                if (ExtendedKStreamDummy.this.rand.nextBoolean()) {
                    context().forward(record);
                }
            }
        }

        ExtendedKStreamDummy() {
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Processor<K, V, K, V> m93get() {
            return new ExtendedKStreamDummyProcessor();
        }
    }

    @Test
    public void testToInternalValueTransformerSupplierSuppliesNewTransformers() {
        ValueTransformerSupplier valueTransformerSupplier = (ValueTransformerSupplier) EasyMock.createMock(ValueTransformerSupplier.class);
        EasyMock.expect(valueTransformerSupplier.get()).andAnswer(NoopValueTransformer::new).atLeastOnce();
        EasyMock.replay(new Object[]{valueTransformerSupplier});
        ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = AbstractStream.toValueTransformerWithKeySupplier(valueTransformerSupplier);
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        EasyMock.verify(new Object[]{valueTransformerSupplier});
    }

    @Test
    public void testToInternalValueTransformerWithKeySupplierSuppliesNewTransformers() {
        ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = (ValueTransformerWithKeySupplier) EasyMock.createMock(ValueTransformerWithKeySupplier.class);
        EasyMock.expect(valueTransformerWithKeySupplier.get()).andAnswer(NoopValueTransformerWithKey::new).atLeastOnce();
        EasyMock.replay(new Object[]{valueTransformerWithKeySupplier});
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        EasyMock.verify(new Object[]{valueTransformerWithKeySupplier});
    }

    @Test
    public void testShouldBeExtensible() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {1, 2, 3, 4, 5, 6, 7};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        new ExtendedKStream(streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.Integer(), Serdes.String()))).randomFilter().process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build());
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, new IntegerSerializer(), new StringSerializer());
            for (int i : iArr) {
                createInputTopic.pipeInput(Integer.valueOf(i), "V" + i);
            }
            Assert.assertTrue(mockApiProcessorSupplier.theCapturedProcessor().processed().size() <= iArr.length);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
