/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.kafka.eventstreams.impl.listener;

import com.google.protobuf.InvalidProtocolBufferException;
import com.mapr.baseutils.Errno;
import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.ShimLoader;
import com.mapr.fs.jni.MapRUserInfo;
import com.mapr.fs.jni.MarlinJniListener;
import com.mapr.fs.jni.NativeData;
import com.mapr.fs.proto.Dbserver;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.kafka.eventstreams.TopicRefreshListListener;
import com.mapr.kafka.eventstreams.TopicRefreshRegexListener;
import com.mapr.kafka.eventstreams.impl.MarlinClient;
import com.mapr.kafka.eventstreams.impl.MarlinTopicInfo;
import com.mapr.kafka.eventstreams.impl.listener.ListenerRecord;
import com.mapr.kafka.eventstreams.impl.listener.MarlinListener;
import com.mapr.kafka.eventstreams.impl.listener.NativeDataParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinListenerImpl
extends MarlinJniListener {
    private static final Logger LOG;
    private static final Integer DEFAULT_MAX_MSGS_BYTES;
    public static final long EARLIEST_TIMESTAMP = 0L;
    private boolean closed = false;
    protected String defaultStreamName;
    private boolean recordStripStreamPath = false;
    protected int rpcTimeoutMs = 0;
    private ConsumerInterceptors<?, ?> interceptors;
    protected final Marlinserver.MarlinInternalDefaults marlinInternalDefaults = Marlinserver.MarlinInternalDefaults.getDefaultInstance();
    protected MarlinCommitCallbackWrapperImpl autoCommitCbWrapper = null;
    private final Dbserver.CDCOpenFormatType _cdcOFType = Dbserver.CDCOpenFormatType.COFT_NONE;

    public Dbserver.CDCOpenFormatType getCDCOpenFormatType() {
        return this._cdcOFType;
    }

    protected void checkConsumerConfig(ConsumerConfig config) throws KafkaException {
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        try {
            String string = config.getString(mConfDef.getClientID());
        }
        catch (ConfigException e) {
            LOG.error("Invalid client id configuration");
            throw e;
        }
        try {
            String e = config.getString(mConfDef.getGroupID());
        }
        catch (ConfigException e) {
            LOG.error("Invalid group.id configuration");
            throw e;
        }
        try {
            boolean e = config.getBoolean(mConfDef.getAutoCommitEnabled());
        }
        catch (ConfigException e) {
            LOG.error("Invalid auto commit enabled configuration");
            throw e;
        }
        try {
            Long autoCommitInt = config.getLong(mConfDef.getAutoCommitInterval());
            if (autoCommitInt < 0L) {
                throw new ConfigException(mConfDef.getAutoCommitInterval() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getAutoCommitInterval() + " configuration");
            throw e;
        }
        try {
            Long metadataMaxAge = config.getLong(mConfDef.getMetadataMaxAge());
            if (metadataMaxAge < 0L) {
                throw new ConfigException(mConfDef.getMetadataMaxAge() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getMetadataMaxAge() + " configuration");
            throw e;
        }
        try {
            Integer fetchMsgMaxBytes = config.getInt(mConfDef.getFetchMsgMaxBytesPerPartition());
            if (fetchMsgMaxBytes < 0) {
                throw new ConfigException(mConfDef.getFetchMsgMaxBytesPerPartition() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getFetchMsgMaxBytesPerPartition() + " configuration");
            throw e;
        }
        try {
            Integer fetchMinBytes = config.getInt(mConfDef.getFetchMinBytes());
            if (fetchMinBytes < 0) {
                throw new ConfigException(mConfDef.getFetchMinBytes() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getFetchMinBytes() + " configuration");
            throw e;
        }
        try {
            String e = config.getString(mConfDef.getAutoOffsetReset());
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getAutoOffsetReset() + " configuration");
            throw e;
        }
        try {
            boolean e = config.getBoolean(mConfDef.getRecordStripStreamPath());
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getRecordStripStreamPath() + " configuration");
            throw e;
        }
    }

    protected void createJniListener(ConsumerConfig config, boolean hardMount, MapRUserInfo userInfo, Dbserver.CDCOpenFormatType cdcOFType) throws NetworkException {
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        this._clntPtr = this.OpenListener(config.getString(mConfDef.getClientID()), config.getString(mConfDef.getGroupID()), this.rpcTimeoutMs, hardMount, config.getBoolean(mConfDef.getAutoCommitEnabled()), config.getLong(mConfDef.getAutoCommitInterval()), null, config.getLong(mConfDef.getMetadataMaxAge()), config.getInt(mConfDef.getFetchMsgMaxBytesPerPartition()), DEFAULT_MAX_MSGS_BYTES, config.getInt(mConfDef.getFetchMinBytes()), config.getInt(mConfDef.getFetchMaxWaitMs()), config.getString(mConfDef.getAutoOffsetReset()), this.defaultStreamName, config.getLong(mConfDef.getConsumerBufferMemory()), config.getBoolean(mConfDef.getNegativeOffsetOnEof()), userInfo, cdcOFType.ordinal(), Integer.MAX_VALUE);
        if (this._clntPtr == 0L) {
            throw new NetworkException("Could not create Consumer. Please ensure that the CLDB service is configured properly and is available");
        }
    }

    public MarlinListenerImpl(ConsumerConfig config, ConsumerInterceptors<?, ?> ints, Dbserver.CDCOpenFormatType cdcOFType) {
        MapRUserInfo userInfo;
        LOG.debug("Starting Streams Listener with cdcOFType " + cdcOFType + ", ordinal " + cdcOFType.ordinal());
        this.interceptors = ints;
        this.autoCommitCbWrapper = this.interceptors == null ? null : new MarlinCommitCallbackWrapperImpl(null, this.interceptors);
        this.checkConsumerConfig(config);
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        this.defaultStreamName = null;
        try {
            this.defaultStreamName = config.getString(mConfDef.getConsumerDefaultStream());
            this.recordStripStreamPath = config.getBoolean(mConfDef.getRecordStripStreamPath());
        }
        catch (ConfigException configException) {
            // empty catch block
        }
        boolean hardMount = true;
        try {
            this.rpcTimeoutMs = config.getInt(mConfDef.getRpcTimeout());
            hardMount = config.getBoolean(mConfDef.getHardMount());
        }
        catch (ConfigException configException) {
            // empty catch block
        }
        try {
            userInfo = MapRFileSystem.CurrentUserInfo();
        }
        catch (IOException e) {
            throw new KafkaException("Could not create MarlinListener", (Throwable)e);
        }
        this.createJniListener(config, hardMount, userInfo, cdcOFType);
        LOG.debug("Streams listener created");
    }

    public Set<TopicPartition> paused() {
        NativeData native_response = new NativeData();
        int err = this.Paused(this._clntPtr, native_response);
        if (err != 0) {
            return new HashSet<TopicPartition>();
        }
        HashSet<TopicPartition> result = new HashSet<TopicPartition>();
        NativeDataParser nativeDataParser = this.getNativeDataParser(native_response);
        while (nativeDataParser.HasData()) {
            result.add(nativeDataParser.getNextTopicPartition());
        }
        return result;
    }

    public Set<TopicPartition> assignment() {
        NativeData native_response = new NativeData();
        int err = this.AssignmentList(this._clntPtr, native_response);
        if (err != 0) {
            return new HashSet<TopicPartition>();
        }
        HashSet<TopicPartition> result = new HashSet<TopicPartition>();
        NativeDataParser nativeDataParser = this.getNativeDataParser(native_response);
        while (nativeDataParser.HasData()) {
            result.add(nativeDataParser.getNextTopicPartition());
        }
        return result;
    }

    protected NativeDataParser getNativeDataParser(NativeData native_response) {
        return new NativeDataParser(native_response);
    }

    public Set<String> subscription() {
        MarlinStringArrayWrapperImpl result = new MarlinStringArrayWrapperImpl();
        int err = this.SubscriptionList(this._clntPtr, result);
        if (err != 0) {
            return new HashSet<String>();
        }
        return result.GetStringSet();
    }

    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) throws KafkaException {
        int err;
        String[] topicArr = new String[topics.size()];
        int i = 0;
        Iterator<String> iterator = topics.iterator();
        while (iterator.hasNext()) {
            String topic;
            topicArr[i] = topic = iterator.next();
            ++i;
        }
        MarlinRebalanceCallbackWrapperImpl rebalancecb = null;
        if (callback != null) {
            rebalancecb = new MarlinRebalanceCallbackWrapperImpl(callback);
        }
        if ((err = this.SubscribeTopics(this._clntPtr, topicArr, rebalancecb)) != 0) {
            throw MarlinClient.jniErrToException(err, "Could not subscribe to topics");
        }
    }

    public void assign(Collection<TopicPartition> partitions) {
        String[] topicArr = new String[partitions.size()];
        int[] feedIdArr = new int[partitions.size()];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.AssignFeeds(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not assign partitions");
        }
    }

    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        String patternString;
        MarlinRebalanceCallbackWrapperImpl rebalancecb = null;
        if (callback != null) {
            rebalancecb = new MarlinRebalanceCallbackWrapperImpl(callback);
        }
        if (!this.checkPatternValid(patternString = pattern.toString())) {
            throw MarlinClient.jniErrToException(22, "Could not subscribe, as invalid pattern " + patternString + " is passed");
        }
        int err = this.SubscribeRegex(this._clntPtr, patternString, rebalancecb);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not subscribe to pattern " + patternString);
        }
    }

    public void unsubscribe() {
        int err = this.Unsubscribe(this._clntPtr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "unsubscribe failed");
        }
    }

    public Map<TopicPartition, List<ListenerRecord>> poll(long timeout) {
        if (timeout == 0L) {
            return new HashMap<TopicPartition, List<ListenerRecord>>();
        }
        NativeData native_response = new NativeData();
        int err = this.Poll(this._clntPtr, timeout, native_response);
        if (err != 0 || native_response.error() != 0) {
            throw MarlinClient.jniErrToException(err, "poll failed");
        }
        NativeDataParser nativeDataParser = this.getNativeDataParser(native_response);
        return nativeDataParser.parseListenerRecords(this.recordStripStreamPath);
    }

    public void commitSync() {
        MarlinCommitCallbackWrapperImpl cbwrapper = this.interceptors == null ? null : new MarlinCommitCallbackWrapperImpl(null, this.interceptors);
        int err = this.CommitAll(this._clntPtr, true, cbwrapper);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not commitSync()");
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        MarlinCommitCallbackWrapperImpl cbwrapper = this.interceptors == null ? null : new MarlinCommitCallbackWrapperImpl(null, this.interceptors);
        this.commitMap(offsets, true, cbwrapper);
    }

    public void commitAsync() {
        MarlinCommitCallbackWrapperImpl cbwrapper = this.interceptors == null ? null : new MarlinCommitCallbackWrapperImpl(null, this.interceptors);
        int err = this.CommitAll(this._clntPtr, false, cbwrapper);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not commitSync()");
        }
    }

    public void commitAsync(OffsetCommitCallback callback) {
        MarlinCommitCallbackWrapperImpl cbwrapper = null;
        cbwrapper = new MarlinCommitCallbackWrapperImpl(callback, this.interceptors);
        int err = this.CommitAll(this._clntPtr, false, cbwrapper);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not commitAsync(callback)");
        }
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        MarlinCommitCallbackWrapperImpl cbwrapper = null;
        cbwrapper = new MarlinCommitCallbackWrapperImpl(callback, this.interceptors);
        this.commitMap(offsets, false, cbwrapper);
    }

    public void topicRefresherRegex(Pattern pattern, TopicRefreshRegexListener callback) {
        MarlinTopicRefreshRegexListenerWrapperImpl refreshcb = null;
        if (callback != null) {
            refreshcb = new MarlinTopicRefreshRegexListenerWrapperImpl(callback);
        }
        String patternString = null;
        if (pattern != null && !this.checkPatternValid(patternString = pattern.toString())) {
            throw MarlinClient.jniErrToException(22, "Could not refresh topics, as invalid pattern " + patternString + " is passed");
        }
        int err = this.TopicRefresherRegex(this._clntPtr, patternString, refreshcb);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not refresh topics for pattern " + patternString);
        }
    }

    public void topicRefresherList(Collection<String> topics, TopicRefreshListListener callback) {
        int err;
        String[] topicArr = new String[topics.size()];
        int i = 0;
        Iterator<String> iterator = topics.iterator();
        while (iterator.hasNext()) {
            String topic;
            topicArr[i] = topic = iterator.next();
            ++i;
        }
        MarlinTopicRefreshListListenerWrapperImpl refreshcb = null;
        if (callback != null) {
            refreshcb = new MarlinTopicRefreshListListenerWrapperImpl(callback);
        }
        if ((err = this.TopicRefresherList(this._clntPtr, topicArr, refreshcb)) != 0) {
            throw MarlinClient.jniErrToException(err, "Could not refresh the list of topics");
        }
    }

    private void commitMap(Map<TopicPartition, OffsetAndMetadata> offsets, boolean commitType, MarlinCommitCallbackWrapperImpl callback) {
        String[] topicArr = new String[offsets.size()];
        String[] metadataArr = new String[offsets.size()];
        int[] feedIdArr = new int[offsets.size()];
        long[] offsetArr = new long[offsets.size()];
        int i = 0;
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition partition = entry.getKey();
            topicArr[i] = partition.topic();
            feedIdArr[i] = partition.partition();
            offsetArr[i] = entry.getValue().offset();
            metadataArr[i++] = entry.getValue().metadata();
        }
        int err = this.Commit(this._clntPtr, topicArr, feedIdArr, offsetArr, metadataArr, commitType, callback);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not commit");
        }
    }

    private void seekInternal(long offset, Collection<TopicPartition> partitions) throws KafkaException {
        String[] topicArr = new String[partitions.size()];
        int[] feedIdArr = new int[partitions.size()];
        long[] offsetArr = new long[partitions.size()];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i] = partition.partition();
            offsetArr[i++] = offset;
        }
        int err = this.Seek(this._clntPtr, topicArr, feedIdArr, offsetArr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not seek");
        }
    }

    public void seek(TopicPartition partition, long offset) {
        this.seekInternal(offset, Arrays.asList(partition));
    }

    public void seekToBeginning(Collection<TopicPartition> partitions) {
        this.seekInternal(0L, partitions);
    }

    public void seekToEnd(Collection<TopicPartition> partitions) {
        this.seekInternal(Long.MAX_VALUE, partitions);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        return this.endOffsets(partitions, this.rpcTimeoutMs);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, int timeoutMS) {
        String[] topicArr = new String[partitions.size()];
        int[] feedIdArr = new int[partitions.size()];
        long[] outOffsets = new long[partitions.size()];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.EndOffsets(this._clntPtr, topicArr, feedIdArr, outOffsets, timeoutMS);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "endOffsets failed");
        }
        HashMap<TopicPartition, Long> outMap = new HashMap<TopicPartition, Long>();
        for (i = 0; i < outOffsets.length; ++i) {
            outMap.put(new TopicPartition(topicArr[i], feedIdArr[i]), outOffsets[i]);
        }
        return outMap;
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        return this.beginningOffsets(partitions, this.rpcTimeoutMs);
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, int timeoutMS) {
        HashMap<TopicPartition, Long> tsSearch = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : partitions) {
            tsSearch.put(tp, 0L);
        }
        Map<TopicPartition, OffsetAndTimestamp> out = this.offsetsForTimes(tsSearch, timeoutMS);
        HashMap<TopicPartition, Long> outMap = new HashMap<TopicPartition, Long>();
        for (TopicPartition p : partitions) {
            OffsetAndTimestamp val = out.get(p);
            long offset = val == null ? 0L : val.offset();
            outMap.put(new TopicPartition(p.topic(), p.partition()), offset);
        }
        return outMap;
    }

    public long position(TopicPartition partition) throws KafkaException {
        NativeData native_response = new NativeData();
        int err = this.QueryPosition(this._clntPtr, partition.topic(), partition.partition(), native_response);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not query position");
        }
        return native_response.long_data[0];
    }

    public OffsetAndMetadata committed(TopicPartition partition) throws KafkaException {
        MarlinOffsetAndMetadataWrapperImpl native_response = new MarlinOffsetAndMetadataWrapperImpl();
        int err = this.QueryCursor(this._clntPtr, partition.topic(), partition.partition(), native_response);
        if (err == 42) {
            return null;
        }
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not query committed offset");
        }
        return native_response.getOffsetAndMetadata();
    }

    public List<PartitionInfo> getTopicInfo(String topic) throws KafkaException {
        int nfeeds = this.GetTopicInfo(this._clntPtr, topic);
        if (nfeeds < 0) {
            throw new UnknownTopicOrPartitionException("could not get TopicInfo, err " + -nfeeds);
        }
        MarlinTopicInfo result = new MarlinTopicInfo(topic, nfeeds);
        return result.getKafkaPartitionInfo();
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        return this.listTopics((String)null);
    }

    public Map<String, List<PartitionInfo>> listTopics(Pattern pattern) {
        String patternString = pattern.toString();
        if (!this.checkPatternValid(patternString)) {
            throw MarlinClient.jniErrToException(22, "Could not listTopics, as invalid pattern " + patternString + " is passed");
        }
        return this.listTopics(pattern.toString());
    }

    public Map<String, List<PartitionInfo>> listTopics(String stream) {
        HashMap<String, List<PartitionInfo>> topicsToReturn = new HashMap<String, List<PartitionInfo>>();
        MarlinStringArrayWrapperImpl result = new MarlinStringArrayWrapperImpl();
        int err = this.GetTopicsFromStream(this._clntPtr, stream, result);
        if (err == 116) {
            err = this.GetTopicsFromStream(this._clntPtr, stream, result);
        }
        if (err != 0) {
            LOG.debug("Could not get list of topics for stream " + stream + ", err " + err);
            return topicsToReturn;
        }
        List<String> topics = result.GetStringList();
        int[] numParts = result.GetNumPartitions();
        if (topics.size() != numParts.length) {
            LOG.error("Could not get list of topics for stream " + stream + ", got " + topics.size() + " topics, but got " + numParts.length + " partitions");
            return topicsToReturn;
        }
        int i = 0;
        for (String topic : topics) {
            MarlinTopicInfo mti = new MarlinTopicInfo(topic, numParts[i]);
            topicsToReturn.put(topic, mti.getKafkaPartitionInfo());
            ++i;
        }
        return topicsToReturn;
    }

    public void pause(Collection<TopicPartition> partitions) {
        String[] topicArr = new String[partitions.size()];
        int[] feedIdArr = new int[partitions.size()];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.Pause(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Error while pausing topic partitions " + partitions);
        }
    }

    public void resume(Collection<TopicPartition> partitions) {
        String[] topicArr = new String[partitions.size()];
        int[] feedIdArr = new int[partitions.size()];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.Resume(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Error while resuming topic partitions " + partitions);
        }
    }

    public Marlinserver.JoinGroupResponse join(Marlinserver.JoinGroupDesc joinDesc, MarlinListener.MarlinJoinCallback cb) {
        byte[] serialDesc = joinDesc.toByteArray();
        MarlinJoinCallbackWrapperImpl cbWrapper = null;
        if (cb != null) {
            cbWrapper = new MarlinJoinCallbackWrapperImpl(cb);
        }
        byte[] resp = this.Join(this._clntPtr, serialDesc, cbWrapper);
        Marlinserver.JoinGroupResponse joinResp = null;
        try {
            joinResp = Marlinserver.JoinGroupResponse.parseFrom((byte[])resp);
        }
        catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            throw new KafkaException("Error parsing Join response");
        }
        return joinResp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(long timeout, TimeUnit timeUnit) {
        MarlinListenerImpl marlinListenerImpl = this;
        synchronized (marlinListenerImpl) {
            if (this._clntPtr != 0L) {
                this.CloseListener(this._clntPtr, timeUnit.toMillis(timeout));
            }
            this.closed = true;
            this._clntPtr = 0L;
        }
    }

    public void wakeup() {
        this.Wakeup(this._clntPtr);
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        return this.offsetsForTimes(timestampsToSearch, this.rpcTimeoutMs);
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, int timeoutMS) {
        String[] topicsArr = new String[timestampsToSearch.size()];
        int[] feedArr = new int[timestampsToSearch.size()];
        long[] timeArr = new long[timestampsToSearch.size()];
        long[] outOffsets = new long[timestampsToSearch.size()];
        long[] outTimes = new long[timestampsToSearch.size()];
        int i = 0;
        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
            topicsArr[i] = entry.getKey().topic();
            feedArr[i] = entry.getKey().partition();
            timeArr[i++] = entry.getValue();
        }
        int err = this.OffsetsForTimes(this._clntPtr, topicsArr, feedArr, timeoutMS, timeArr, outOffsets, outTimes);
        if (err == 38) {
            throw new KafkaException(Errno.toString((int)err) + " (" + err + ")  Please complete the upgrade of the servers");
        }
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "OffsetsForTimes failed");
        }
        HashMap<TopicPartition, OffsetAndTimestamp> outMap = new HashMap<TopicPartition, OffsetAndTimestamp>();
        for (i = 0; i < outTimes.length; ++i) {
            if (outTimes[i] == this.marlinInternalDefaults.getNoTimestamp()) continue;
            outMap.put(new TopicPartition(topicsArr[i], feedArr[i]), new OffsetAndTimestamp(outOffsets[i], outTimes[i]));
        }
        return outMap;
    }

    private boolean checkPatternValid(String pattern) {
        Object lastPart;
        int idx;
        String topic = pattern;
        int slashIdx = pattern.lastIndexOf(47);
        if (slashIdx >= 0 && (idx = ((String)(lastPart = pattern.substring(slashIdx + 1))).indexOf(58)) >= 0) {
            topic = ((String)lastPart).substring(idx + 1);
        }
        try {
            lastPart = Pattern.compile(topic);
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    static {
        ShimLoader.load();
        LOG = LoggerFactory.getLogger(MarlinListenerImpl.class);
        DEFAULT_MAX_MSGS_BYTES = 0x3200000;
    }

    public class MarlinTopicRefreshListListenerWrapperImpl
    implements MarlinJniListener.MarlinTopicRefreshListListenerWrapper {
        private TopicRefreshListListener listListener;

        public MarlinTopicRefreshListListenerWrapperImpl(TopicRefreshListListener ll) {
            this.listListener = ll;
        }

        public void updatedTopics(NativeData data) {
            if (this.listListener != null) {
                NativeDataParser dataParser = new NativeDataParser(data);
                HashSet<TopicPartition> topicFeeds = new HashSet<TopicPartition>();
                while (dataParser.HasData()) {
                    topicFeeds.add(dataParser.getNextTopicPartition());
                }
                this.listListener.updatedTopics(topicFeeds);
            }
        }
    }

    public class MarlinTopicRefreshRegexListenerWrapperImpl
    implements MarlinJniListener.MarlinTopicRefreshRegexListenerWrapper {
        private TopicRefreshRegexListener regexListener;

        public MarlinTopicRefreshRegexListenerWrapperImpl(TopicRefreshRegexListener rl) {
            this.regexListener = rl;
        }

        public void updatedTopics(NativeData data) {
            if (this.regexListener != null) {
                ArrayList feeds = new ArrayList();
                NativeDataParser dataParser = new NativeDataParser(data);
                HashSet<String> topicNames = new HashSet<String>();
                while (dataParser.HasData()) {
                    TopicPartition tp = dataParser.getNextTopicPartition();
                    topicNames.add(tp.topic());
                }
                this.regexListener.updatedTopics(topicNames);
            }
        }
    }

    public class MarlinJoinCallbackWrapperImpl
    implements MarlinJniListener.MarlinJoinCallbackWrapper {
        private MarlinListener.MarlinJoinCallback joincb;

        public MarlinJoinCallbackWrapperImpl(MarlinListener.MarlinJoinCallback cb) {
            this.joincb = cb;
        }

        public void onJoin(byte[] data) {
            try {
                Marlinserver.JoinGroupInfo joinInfo = Marlinserver.JoinGroupInfo.parseFrom((byte[])data);
                this.joincb.onJoin(joinInfo);
            }
            catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
                return;
            }
        }

        public void onRejoin(byte[] data) {
            try {
                Marlinserver.JoinGroupInfo joinInfo = Marlinserver.JoinGroupInfo.parseFrom((byte[])data);
                this.joincb.onRejoin(joinInfo);
            }
            catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
                return;
            }
        }
    }

    public class MarlinCommitCallbackWrapperImpl
    implements MarlinJniListener.MarlinCommitCallbackWrapper {
        private OffsetCommitCallback commitcb;
        private ConsumerInterceptors<?, ?> interceptors;

        public MarlinCommitCallbackWrapperImpl(OffsetCommitCallback cb, ConsumerInterceptors<?, ?> ints) {
            this.commitcb = cb;
            this.interceptors = ints;
        }

        public void onComplete(NativeData data, long[] offsets, String[] metadatas, int errorCode) {
            if (this.interceptors == null && this.commitcb == null) {
                return;
            }
            HashMap<TopicPartition, OffsetAndMetadata> feedAndOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
            int index = 0;
            NativeDataParser dataParser = MarlinListenerImpl.this.getNativeDataParser(data);
            while (dataParser.HasData()) {
                feedAndOffsets.put(dataParser.getNextTopicPartition(), new OffsetAndMetadata(offsets[index], metadatas[index]));
                ++index;
            }
            if (this.interceptors != null) {
                this.interceptors.onCommit(feedAndOffsets);
            }
            if (this.commitcb != null) {
                this.commitcb.onComplete(feedAndOffsets, (Exception)MarlinClient.jniErrToException(errorCode, null));
            }
        }
    }

    public class MarlinRebalanceCallbackWrapperImpl
    implements MarlinJniListener.MarlinRebalanceCallbackWrapper {
        private ConsumerRebalanceListener rebalanceListener;

        public MarlinRebalanceCallbackWrapperImpl(ConsumerRebalanceListener rl) {
            this.rebalanceListener = rl;
        }

        public void onPartitionsAssigned(NativeData data) {
            if (this.rebalanceListener != null) {
                ArrayList<TopicPartition> feeds = new ArrayList<TopicPartition>();
                NativeDataParser dataParser = new NativeDataParser(data);
                while (dataParser.HasData()) {
                    feeds.add(dataParser.getNextTopicPartition());
                }
                this.rebalanceListener.onPartitionsAssigned(feeds);
            }
        }

        public void onPartitionsRevoked(NativeData data) {
            if (this.rebalanceListener != null) {
                ArrayList<TopicPartition> feeds = new ArrayList<TopicPartition>();
                NativeDataParser dataParser = new NativeDataParser(data);
                while (dataParser.HasData()) {
                    feeds.add(dataParser.getNextTopicPartition());
                }
                this.rebalanceListener.onPartitionsRevoked(feeds);
            }
        }
    }

    public class MarlinStringArrayWrapperImpl
    implements MarlinJniListener.MarlinStringArrayWrapper {
        private String topicNames;
        private int[] topicNameSizes;
        private int[] numPartitions;
        private int numTopics;

        public void SetStringArrayElements(String tn, int[] tns, int[] np, int nt) {
            this.topicNames = tn;
            this.topicNameSizes = tns;
            this.numPartitions = np;
            this.numTopics = nt;
        }

        public Set<String> GetStringSet() {
            HashSet<String> toReturn = new HashSet<String>();
            int offset = 0;
            for (int i = 0; i < this.numTopics; ++i) {
                String tname = this.topicNames.substring(offset, offset + this.topicNameSizes[i]);
                offset += this.topicNameSizes[i];
                toReturn.add(tname);
            }
            return toReturn;
        }

        public List<String> GetStringList() {
            ArrayList<String> toReturn = new ArrayList<String>(this.numTopics);
            int offset = 0;
            for (int i = 0; i < this.numTopics; ++i) {
                String tname = this.topicNames.substring(offset, offset + this.topicNameSizes[i]);
                offset += this.topicNameSizes[i];
                toReturn.add(tname);
            }
            return toReturn;
        }

        public int[] GetNumPartitions() {
            return this.numPartitions;
        }
    }

    public class MarlinOffsetAndMetadataWrapperImpl
    implements MarlinJniListener.MarlinOffsetAndMetadataWrapper {
        private String metadata;
        private long offset;

        public void SetOffsetAndMetadata(String m, long o) {
            this.metadata = m;
            this.offset = o;
        }

        OffsetAndMetadata getOffsetAndMetadata() {
            return new OffsetAndMetadata(this.offset, this.metadata);
        }
    }
}

