/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.fs.marlin.listener;

import com.mapr.fs.marlin.MarlinTopicInfo;
import com.mapr.fs.marlin.jni.MarlinJniListener;
import com.mapr.fs.marlin.listener.ListenerRecord;
import com.mapr.fs.marlin.listener.NativeData;
import com.mapr.fs.proto.Marlinserver;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.CommitType;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinListenerImpl
extends MarlinJniListener {
    private static final Logger LOG = LoggerFactory.getLogger(MarlinListenerImpl.class);
    private final ConsumerRebalanceCallback rebalanceCallback;
    private final Consumer kc;
    private boolean closed = false;

    private static void checkConsumerConfig(ConsumerConfig config) throws KafkaException {
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        try {
            String clientID = config.getString(mConfDef.getClientID());
        }
        catch (ConfigException e) {
            LOG.error("Invalid client id configuration");
            throw e;
        }
        try {
            String groupID = config.getString(mConfDef.getGroupID());
        }
        catch (ConfigException e) {
            LOG.error("Invalid group.id configuration");
            throw e;
        }
        try {
            boolean autoCommitEnable = config.getBoolean(mConfDef.getAutoCommitEnabled());
        }
        catch (ConfigException e) {
            LOG.error("Invalid auto commit enabled configuration");
            throw e;
        }
        try {
            Long autoCommitInt = config.getLong(mConfDef.getAutoCommitInterval());
            if (autoCommitInt < 0L) {
                throw new ConfigException(mConfDef.getAutoCommitInterval() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getAutoCommitInterval() + " configuration");
            throw e;
        }
        try {
            Long metadataMaxAge = config.getLong(mConfDef.getMetadataMaxAge());
            if (metadataMaxAge < 0L) {
                throw new ConfigException(mConfDef.getMetadataMaxAge() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getMetadataMaxAge() + " configuration");
            throw e;
        }
        try {
            Integer fetchMsgMaxBytes = config.getInt(mConfDef.getFetchMsgMaxBytesPerPartition());
            if (fetchMsgMaxBytes < 0) {
                throw new ConfigException(mConfDef.getFetchMsgMaxBytesPerPartition() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getFetchMsgMaxBytesPerPartition() + " configuration");
            throw e;
        }
        try {
            Integer fetchMinBytes = config.getInt(mConfDef.getFetchMinBytes());
            if (fetchMinBytes < 0) {
                throw new ConfigException(mConfDef.getFetchMinBytes() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getFetchMinBytes() + " configuration");
            throw e;
        }
        try {
            String autoOffsetReset = config.getString(mConfDef.getAutoOffsetReset());
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getAutoOffsetReset() + " configuration");
            throw e;
        }
    }

    public MarlinListenerImpl(ConsumerConfig config, Consumer kc, ConsumerRebalanceCallback callback) {
        LOG.debug("Starting Marlin Listener");
        this.rebalanceCallback = callback;
        this.kc = kc;
        MarlinListenerImpl.checkConsumerConfig(config);
        this._clntPtr = this.OpenListener(config);
        if (this._clntPtr == 0L) {
            throw new KafkaException("Could not create MarlinListener");
        }
        LOG.debug("Marlin listener created");
    }

    public void NativeRebalanceCb(NativeData data, boolean revoke) {
        if (this.rebalanceCallback == null) {
            return;
        }
        ArrayList<TopicPartition> feeds = new ArrayList<TopicPartition>();
        data.startParser();
        while (data.HasData()) {
            feeds.add(data.getNextTopicPartition());
        }
        if (revoke) {
            this.rebalanceCallback.onPartitionsRevoked(this.kc, feeds);
        } else {
            this.rebalanceCallback.onPartitionsAssigned(this.kc, feeds);
        }
    }

    public List<PartitionInfo> getTopicInfo(String topic) throws KafkaException {
        int nfeeds = this.GetTopicInfo(this._clntPtr, topic);
        if (nfeeds < 0) {
            throw new UnknownTopicOrPartitionException("could not get TopicInfo, err " + -nfeeds);
        }
        MarlinTopicInfo result = new MarlinTopicInfo(topic, nfeeds);
        return result.getKafkaPartitionInfo();
    }

    public Set<TopicPartition> subscriptions() {
        NativeData native_response = new NativeData();
        int err = this.SubscriptionList(this._clntPtr, native_response);
        if (err != 0) {
            throw new KafkaException("Could not get the subscription list");
        }
        HashSet<TopicPartition> result = new HashSet<TopicPartition>();
        native_response.startParser();
        while (native_response.HasData()) {
            result.add(native_response.getNextTopicPartition());
        }
        return result;
    }

    public void subscribe(String ... topics) throws KafkaException {
        String[] topicArr = new String[topics.length];
        int[] feedIdArr = new int[topics.length];
        int i = 0;
        String[] arr$ = topics;
        int len$ = arr$.length;
        for (int i$ = 0; i$ < len$; ++i$) {
            String topic;
            topicArr[i] = topic = arr$[i$];
            feedIdArr[i++] = -1;
        }
        int err = this.Subscribe(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw new KafkaException("Could not subscribe");
        }
    }

    public void unsubscribe(String ... topics) {
        String[] topicArr = new String[topics.length];
        int[] feedIdArr = new int[topics.length];
        int i = 0;
        String[] arr$ = topics;
        int len$ = arr$.length;
        for (int i$ = 0; i$ < len$; ++i$) {
            String topic;
            topicArr[i] = topic = arr$[i$];
            feedIdArr[i++] = -1;
        }
        int err = this.Unsubscribe(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw new KafkaException("Could not unsubscribe");
        }
    }

    public void subscribe(TopicPartition ... partitions) {
        String[] topicArr = new String[partitions.length];
        int[] feedIdArr = new int[partitions.length];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.Subscribe(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw new KafkaException("Could not subscribe");
        }
    }

    public void unsubscribe(TopicPartition ... partitions) {
        String[] topicArr = new String[partitions.length];
        int[] feedIdArr = new int[partitions.length];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.Unsubscribe(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw new KafkaException("Could not unsubscribe");
        }
    }

    public Map<TopicPartition, List<ListenerRecord>> poll(long timeout) {
        NativeData native_response = new NativeData();
        int err = this.Poll(this._clntPtr, timeout, native_response);
        if (err != 0 || native_response.error() != 0) {
            throw new KafkaException("poll failed");
        }
        return native_response.parseListenerRecords();
    }

    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
        String[] topicArr = new String[offsets.size()];
        int[] feedIdArr = new int[offsets.size()];
        long[] offsetArr = new long[offsets.size()];
        int i = 0;
        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            TopicPartition partition = entry.getKey();
            topicArr[i] = partition.topic();
            feedIdArr[i] = partition.partition();
            offsetArr[i++] = entry.getValue();
        }
        int err = this.Commit(this._clntPtr, topicArr, feedIdArr, offsetArr, commitType == CommitType.SYNC);
        if (err != 0) {
            throw new KafkaException("Could not commit");
        }
    }

    public void commit(CommitType commitType) throws KafkaException {
        int err = this.CommitAll(this._clntPtr, commitType == CommitType.SYNC);
        if (err != 0) {
            throw new KafkaException("Could not commit");
        }
    }

    void seekInternal(long offset, TopicPartition ... partitions) throws KafkaException {
        String[] topicArr = new String[partitions.length];
        int[] feedIdArr = new int[partitions.length];
        long[] offsetArr = new long[partitions.length];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i] = partition.partition();
            offsetArr[i++] = offset;
        }
        int err = this.Seek(this._clntPtr, topicArr, feedIdArr, offsetArr);
        if (err != 0) {
            throw new KafkaException("Could not seek");
        }
    }

    public void seek(TopicPartition partition, long offset) {
        this.seekInternal(offset, partition);
    }

    public void seekToBeginning(TopicPartition ... partitions) {
        this.seekInternal(0L, partitions);
    }

    public void seekToEnd(TopicPartition ... partitions) {
        this.seekInternal(Long.MAX_VALUE, partitions);
    }

    public long position(TopicPartition partition) throws KafkaException {
        NativeData native_response = new NativeData();
        int err = this.QueryPosition(this._clntPtr, partition.topic(), partition.partition(), native_response);
        if (err != 0) {
            throw new KafkaException("Could not query position");
        }
        return native_response.long_data[0];
    }

    public long committed(TopicPartition partition) throws KafkaException {
        NativeData native_response = new NativeData();
        int err = this.QueryCursor(this._clntPtr, partition.topic(), partition.partition(), native_response);
        if (err != 0) {
            throw new KafkaException("Could not query committed offset");
        }
        return native_response.long_data[0];
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        MarlinListenerImpl marlinListenerImpl = this;
        synchronized (marlinListenerImpl) {
            if (this._clntPtr != 0L) {
                this.CloseListener(this._clntPtr);
            }
            this._clntPtr = 0L;
        }
    }
}

