/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tiered.storage.actions;

import java.io.PrintStream;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
import org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher;
import org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;

public final class ConsumeAction
implements TieredStorageTestAction {
    private final TopicPartition topicPartition;
    private final Long fetchOffset;
    private final Integer expectedTotalCount;
    private final Integer expectedFromSecondTierCount;
    private final RemoteFetchSpec remoteFetchSpec;
    private final Serde<String> serde = Serdes.String();

    public ConsumeAction(TopicPartition topicPartition, Long fetchOffset, Integer expectedTotalCount, Integer expectedFromSecondTierCount, RemoteFetchSpec remoteFetchSpec) {
        this.topicPartition = topicPartition;
        this.fetchOffset = fetchOffset;
        this.expectedTotalCount = expectedTotalCount;
        this.expectedFromSecondTierCount = expectedFromSecondTierCount;
        this.remoteFetchSpec = remoteFetchSpec;
    }

    @Override
    public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException {
        LocalTieredStorageHistory history = context.tieredStorageHistory(this.remoteFetchSpec.getSourceBrokerId());
        Optional<LocalTieredStorageEvent> latestEventSoFar = history.latestEvent(LocalTieredStorageEvent.EventType.FETCH_SEGMENT, this.topicPartition);
        List<ConsumerRecord<String, String>> consumedRecords = context.consume(this.topicPartition, this.expectedTotalCount, this.fetchOffset);
        List<Record> tieredStorageRecords = TieredStorageTestUtils.tieredStorageRecords(context, this.topicPartition);
        Optional<Record> firstExpectedRecordOpt = tieredStorageRecords.stream().filter(record -> record.offset() >= this.fetchOffset).findFirst();
        if (!firstExpectedRecordOpt.isPresent()) {
            if (this.expectedFromSecondTierCount > 0) {
                Assertions.fail((String)("Could not find any record with offset >= " + this.fetchOffset + " from tier storage."));
            }
            return;
        }
        int indexOfFetchOffsetInTieredStorage = tieredStorageRecords.indexOf(firstExpectedRecordOpt.get());
        int recordsCountFromFirstIndex = tieredStorageRecords.size() - indexOfFetchOffsetInTieredStorage;
        Assertions.assertFalse((this.expectedFromSecondTierCount > recordsCountFromFirstIndex ? 1 : 0) != 0, (String)("Not enough records found in tiered storage from offset " + this.fetchOffset + " for " + this.topicPartition + ". Expected: " + this.expectedFromSecondTierCount + ", Was: " + recordsCountFromFirstIndex));
        Assertions.assertFalse((this.expectedFromSecondTierCount < recordsCountFromFirstIndex ? 1 : 0) != 0, (String)("Too many records found in tiered storage from offset " + this.fetchOffset + " for " + this.topicPartition + ". Expected: " + this.expectedFromSecondTierCount + ", Was: " + recordsCountFromFirstIndex));
        List<Record> storedRecords = tieredStorageRecords.subList(indexOfFetchOffsetInTieredStorage, tieredStorageRecords.size());
        List<ConsumerRecord<String, String>> readRecords = consumedRecords.subList(0, this.expectedFromSecondTierCount);
        MatcherAssert.assertThat(storedRecords, RecordsKeyValueMatcher.correspondTo(readRecords, this.topicPartition, this.serde, this.serde));
        List<LocalTieredStorageEvent> events = history.getEvents(LocalTieredStorageEvent.EventType.FETCH_SEGMENT, this.topicPartition);
        List<LocalTieredStorageEvent> eventsInScope = latestEventSoFar.map(latestEvent -> events.stream().filter(event -> event.isAfter((LocalTieredStorageEvent)latestEvent)).collect(Collectors.toList())).orElse(events);
        Assertions.assertEquals((int)this.remoteFetchSpec.getCount(), (int)eventsInScope.size(), (String)("Number of fetch requests from broker " + this.remoteFetchSpec.getSourceBrokerId() + " to the tier storage does not match the expected value for topic-partition " + this.remoteFetchSpec.getTopicPartition()));
    }

    @Override
    public void describe(PrintStream output) {
        output.println("consume-action:");
        output.println("  topic-partition = " + this.topicPartition);
        output.println("  fetch-offset = " + this.fetchOffset);
        output.println("  expected-record-count = " + this.expectedTotalCount);
        output.println("  expected-record-from-tiered-storage = " + this.expectedFromSecondTierCount);
    }
}

