package org.apache.flume.sink.kite.policy;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.sink.kite.DatasetSinkConstants;
import org.apache.flume.sink.kite.policy.FailurePolicy;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Formats;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.View;

/* loaded from: input_file:org/apache/flume/sink/kite/policy/SavePolicy.class */
public class SavePolicy implements FailurePolicy {
    private final View<AvroFlumeEvent> dataset;
    private DatasetWriter<AvroFlumeEvent> writer;
    private int nEventsHandled;

    /* loaded from: input_file:org/apache/flume/sink/kite/policy/SavePolicy$Builder.class */
    public static class Builder implements FailurePolicy.Builder {
        @Override // org.apache.flume.sink.kite.policy.FailurePolicy.Builder
        public FailurePolicy build(Context context) {
            return new SavePolicy(context);
        }
    }

    private SavePolicy(Context context) {
        String string = context.getString(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI);
        Preconditions.checkArgument(string != null, "Must set kite.error.dataset.uri when kite.failurePolicy=save");
        if (Datasets.exists(string)) {
            this.dataset = Datasets.load(string, AvroFlumeEvent.class);
        } else {
            this.dataset = Datasets.create(string, new DatasetDescriptor.Builder().schema(AvroFlumeEvent.class).build(), AvroFlumeEvent.class);
        }
        this.nEventsHandled = 0;
    }

    @Override // org.apache.flume.sink.kite.policy.FailurePolicy
    public void handle(Event event, Throwable th) throws EventDeliveryException {
        try {
            if (this.writer == null) {
                this.writer = this.dataset.newWriter();
            }
            AvroFlumeEvent avroFlumeEvent = new AvroFlumeEvent();
            avroFlumeEvent.setBody(ByteBuffer.wrap(event.getBody()));
            avroFlumeEvent.setHeaders(toCharSeqMap(event.getHeaders()));
            this.writer.write(avroFlumeEvent);
            this.nEventsHandled++;
        } catch (RuntimeException e) {
            throw new EventDeliveryException(e);
        }
    }

    @Override // org.apache.flume.sink.kite.policy.FailurePolicy
    public void sync() throws EventDeliveryException {
        if (this.nEventsHandled > 0) {
            if (Formats.PARQUET.equals(this.dataset.getDataset().getDescriptor().getFormat())) {
                close();
            } else if (this.writer instanceof Syncable) {
                this.writer.sync();
            }
        }
    }

    @Override // org.apache.flume.sink.kite.policy.FailurePolicy
    public void close() throws EventDeliveryException {
        try {
            if (this.nEventsHandled > 0) {
                try {
                    this.writer.close();
                    this.writer = null;
                    this.nEventsHandled = 0;
                } catch (RuntimeException e) {
                    throw new EventDeliveryException(e);
                }
            }
        } catch (Throwable th) {
            this.writer = null;
            this.nEventsHandled = 0;
            throw th;
        }
    }

    private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> map) {
        return Maps.newHashMap(map);
    }
}
