/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;

class WithAdapterSinkWriterOperatorTest
extends SinkWriterOperatorTestBase {
    WithAdapterSinkWriterOperatorTest() {
    }

    @Override
    InspectableSink sinkWithoutCommitter() {
        TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultSinkWriter();
        return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).build());
    }

    @Override
    InspectableSink sinkWithCommitter() {
        TestSinkV2.DefaultCommittingSinkWriter sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter();
        return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build());
    }

    @Override
    InspectableSink sinkWithTimeBasedWriter() {
        TimeBasedBufferingSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter();
        return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build());
    }

    @Override
    InspectableSink sinkWithState(boolean withState, String stateName) {
        TestSinkV2.DefaultStatefulSinkWriter sinkWriter = new TestSinkV2.DefaultStatefulSinkWriter();
        TestSinkV2.Builder builder = TestSinkV2.newBuilder().setWriter(sinkWriter).setDefaultCommitter().setWithPostCommitTopology(true);
        builder.setWriterState(withState);
        if (stateName != null) {
            builder.setCompatibleStateNames(stateName);
        }
        return new InspectableSink(builder.build());
    }

    static class InspectableSink
    extends SinkWriterOperatorTestBase.AbstractInspectableSink<Sink<Integer>> {
        private final TestSinkV2<Integer> sink;

        InspectableSink(TestSinkV2<Integer> sink) {
            super(sink);
            this.sink = sink;
        }

        @Override
        public long getLastCheckpointId() {
            return this.sink.getWriter().lastCheckpointId;
        }

        @Override
        public List<String> getRecordsOfCurrentCheckpoint() {
            return this.sink.getWriter().elements;
        }

        @Override
        public List<Watermark> getWatermarks() {
            return this.sink.getWriter().watermarks;
        }

        @Override
        public int getRecordCountFromState() {
            TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = this.sink.getWriter();
            if (sinkWriter instanceof TestSinkV2.DefaultStatefulSinkWriter) {
                return ((TestSinkV2.DefaultStatefulSinkWriter)sinkWriter).getRecordCount();
            }
            return 0;
        }
    }

    private static class TimeBasedBufferingSinkWriter
    extends TestSinkV2.DefaultStatefulSinkWriter<Integer>
    implements ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList<String>();
        private ProcessingTimeService processingTimeService;

        private TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of((Object)element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        public void onProcessingTime(long time) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimeService.registerTimer(time + 1000L, (ProcessingTimeService.ProcessingTimeCallback)this);
        }

        @Override
        public void init(WriterInitContext context) {
            this.processingTimeService = context.getProcessingTimeService();
            this.processingTimeService.registerTimer(1000L, (ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }
}

