/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.fs.marlin.producer;

import com.mapr.fs.marlin.MarlinTopicInfo;
import com.mapr.fs.marlin.jni.MarlinJniProducer;
import com.mapr.fs.marlin.producer.MarlinFuture;
import com.mapr.fs.marlin.producer.MarlinProducerResult;
import com.mapr.fs.proto.Marlinserver;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MarlinPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinProducerImpl
extends MarlinJniProducer {
    private static final Logger LOG = LoggerFactory.getLogger(MarlinProducerImpl.class);
    private static final int MAXWORKQUEUESIZE = 4;
    private static final AtomicLong curProducerId = new AtomicLong(1L);
    private int MaxWorkQueues;
    private List<WorkQueue> workQueues;
    private List<Thread> workers;
    private boolean multiFlushers;
    private boolean shuttingdown;
    private final AtomicLong counter = new AtomicLong(0L);
    private final AtomicLong outstandingMsgs = new AtomicLong(0L);
    private Map<String, Integer> topicNameToNumPartitions;
    private MarlinPartitioner partitioner;
    private long producerId;
    private String defaultClusterPath;

    private static void checkProducerConfig(ProducerConfig config) throws KafkaException {
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        try {
            String clientID = config.getString(mConfDef.getClientID());
        }
        catch (ConfigException e) {
            LOG.error("Invalid client id configuration");
            throw e;
        }
        try {
            boolean mflushers = config.getBoolean(mConfDef.getParallelFlushersPerPartition());
        }
        catch (ConfigException e) {
            LOG.error("Invalid marlin paralle flushers configuration");
            throw e;
        }
        try {
            long bufferMemory = config.getLong(mConfDef.getBufferMemory());
            if (bufferMemory <= 0L) {
                throw new ConfigException("Buffer memory (" + bufferMemory + ") must be a positive number");
            }
        }
        catch (ConfigException e) {
            LOG.error("Invalid buffer memory configuration");
            throw e;
        }
        try {
            long bufferTime = config.getLong(mConfDef.getBufferTime());
            if (bufferTime < 0L) {
                throw new ConfigException("buffer ms (" + bufferTime + ") cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error("Invalid marlin buffer time configuration");
            throw e;
        }
        try {
            long metadataAge = config.getLong(mConfDef.getMetadataMaxAge());
            if (metadataAge < 0L) {
                throw new ConfigException("metadata max age (" + metadataAge + ") cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error("Invalid metadata max age configuration");
            throw e;
        }
    }

    public MarlinProducerImpl(ProducerConfig config) throws KafkaException {
        MarlinProducerImpl.checkProducerConfig(config);
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        this.multiFlushers = config.getBoolean(mConfDef.getParallelFlushersPerPartition());
        this.partitioner = (MarlinPartitioner)config.getConfiguredInstance(mConfDef.getPartitioner(), MarlinPartitioner.class);
        this.shuttingdown = false;
        this.MaxWorkQueues = 4;
        this._clntPtr = this.OpenProducer(config);
        this.producerId = curProducerId.getAndIncrement();
        if (this._clntPtr == 0L) {
            throw new KafkaException("Could not create MarlinProducer");
        }
        this.defaultClusterPath = this.GetDefaultClusterPath(this._clntPtr);
        this.workQueues = new ArrayList<WorkQueue>();
        this.workers = new ArrayList<Thread>();
        for (int i = 0; i < this.MaxWorkQueues; ++i) {
            WorkQueue wq = new WorkQueue(1024);
            this.workQueues.add(wq);
            Thread worker = new Thread(new WorkerThread(wq, i));
            this.workers.add(worker);
            worker.start();
        }
        this.topicNameToNumPartitions = new ConcurrentHashMap<String, Integer>();
    }

    public Future<RecordMetadata> send(String topic, int feed, Object keyObj, byte[] key, Object valueObj, byte[] value, Callback callback) throws KafkaException {
        if (this.shuttingdown) {
            LOG.error("Cannot send message when producer is shutting down.");
            if (callback != null) {
                callback.onCompletion(new RecordMetadata(new TopicPartition(topic, -1), -1L, 0L), (Exception)new RuntimeException("Producer is shutting down. Cannot send."));
            }
            return new FutureFailure(new RuntimeException("Producer is shutting down. Cannot send."));
        }
        if (value == null || value.length == 0) {
            LOG.error("Cannot send message without any value");
            if (callback != null) {
                callback.onCompletion(new RecordMetadata(new TopicPartition(topic, -1), -1L, 0L), (Exception)new IllegalArgumentException("Value is empty"));
            }
            return new FutureFailure(new IllegalArgumentException("Value is empty"));
        }
        if (feed == -1) {
            int numParts = 0;
            if (topic.startsWith(this.defaultClusterPath)) {
                topic = topic.substring(this.defaultClusterPath.length());
            }
            try {
                numParts = this.getNumPartitions(topic);
            }
            catch (KafkaException e) {
                LOG.error("Cannot send message, got an error while fetching number of partitions " + (Object)((Object)e));
                if (callback != null) {
                    callback.onCompletion(new RecordMetadata(new TopicPartition(topic, -1), -1L, 0L), (Exception)new IllegalArgumentException(e.toString()));
                }
                return new FutureFailure(new IllegalArgumentException(e.toString()));
            }
            feed = this.partitioner.partition(topic, keyObj, key, valueObj, value, numParts);
        }
        MarlinProducerResult result = new MarlinProducerResult(topic, feed, callback);
        ProducerRecordJob job = new ProducerRecordJob(result, key, value);
        int queueNumber = 0;
        if (this.multiFlushers && feed == -1 && key == null) {
            queueNumber = (int)((long)queueNumber + this.counter.getAndIncrement());
        } else {
            queueNumber += topic.length();
            if (feed > 0) {
                queueNumber += feed;
            }
        }
        int qid = queueNumber % this.MaxWorkQueues;
        try {
            this.workQueues.get(qid).enqueue(job);
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            return new FutureFailure(e);
        }
        this.outstandingMsgs.getAndIncrement();
        return new MarlinFuture(result);
    }

    public void flush() throws KafkaException {
        for (int i = 0; i < this.MaxWorkQueues; ++i) {
            this.workQueues.get(i).enqueueFlushAndWait();
        }
        int err = this.Flush(this._clntPtr);
        if (err != 0) {
            throw new KafkaException("could not flush to MarlinProducer");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleJniCallbacks(int length, long[] offsets, MarlinProducerResult[] results, int feedid, int errorCode) {
        block11: {
            LOG.debug("Handling callback for " + length);
            try {
                for (int i = 0; i < length; ++i) {
                    MarlinProducerResult result = results[i];
                    if (result != null) {
                        result.done(result.getTopic(), feedid, offsets[i], this.jniErrToException(errorCode));
                        if (result.callback() == null) continue;
                        result.callback().onCompletion(result.getRecordMetadata(), (Exception)result.error());
                        continue;
                    }
                    LOG.error("producer.handleJniCallbacks() got null as result");
                }
                long prevValue = this.outstandingMsgs.getAndAdd(-1 * length);
                if (prevValue < (long)length) {
                    LOG.error("outstandingMsgs are " + prevValue + " but we got results for " + length);
                }
                if (this.outstandingMsgs.get() != 0L) break block11;
                AtomicLong atomicLong = this.outstandingMsgs;
                synchronized (atomicLong) {
                    if (this.outstandingMsgs.get() == 0L) {
                        try {
                            this.outstandingMsgs.notifyAll();
                        }
                        catch (Exception e) {
                            LOG.error("producer.handleJniCallbacks() exception while closing " + e);
                        }
                    }
                }
            }
            catch (Throwable e) {
                LOG.error("Hitting exception during handleJniCallbacks");
                LOG.error("Exception " + e);
            }
        }
    }

    public void handleJniTopicMetadata(String topicNames, int[] topicNameSizes, int[] numPartitionsForTopic, int arraySize) {
        LOG.debug("Handling topic metadata changes for " + arraySize + " topics");
        try {
            int offset = 0;
            for (int i = 0; i < arraySize; ++i) {
                String tname = topicNames.substring(offset, offset + topicNameSizes[i]);
                offset += topicNameSizes[i];
                if (numPartitionsForTopic[i] == 0) {
                    this.topicNameToNumPartitions.remove(tname);
                    continue;
                }
                this.topicNameToNumPartitions.put(tname, numPartitionsForTopic[i]);
            }
        }
        catch (Throwable e) {
            LOG.error("Hitting exception during handleJniTopicMetadata");
            LOG.error("Exception " + e);
        }
    }

    private int getNumPartitions(String topic) throws KafkaException {
        Integer nfeeds = this.topicNameToNumPartitions.get(topic);
        if (nfeeds == null) {
            nfeeds = this.GetTopicInfo(this._clntPtr, topic);
            if (nfeeds < 0) {
                throw new UnknownTopicOrPartitionException("could not get TopicInfo, err " + -nfeeds.intValue());
            }
            this.topicNameToNumPartitions.put(topic, nfeeds);
        }
        return nfeeds;
    }

    public List<PartitionInfo> getTopicInfo(String topic) throws KafkaException {
        int nfeeds = this.getNumPartitions(topic);
        MarlinTopicInfo result = new MarlinTopicInfo(topic, nfeeds);
        return result.getKafkaPartitionInfo();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        LOG.debug("Producer is shutting down");
        MarlinProducerImpl marlinProducerImpl = this;
        synchronized (marlinProducerImpl) {
            if (this.shuttingdown) {
                LOG.debug("Producer shutdown already in progress");
                return;
            }
            this.shuttingdown = true;
        }
        LOG.debug("Producer actually shutting down");
        this.flush();
        for (int i = 0; i < this.MaxWorkQueues; ++i) {
            this.workQueues.get(i).shutdown();
        }
        if (this.outstandingMsgs.get() != 0L) {
            AtomicLong i = this.outstandingMsgs;
            synchronized (i) {
                if (this.outstandingMsgs.get() != 0L) {
                    try {
                        this.outstandingMsgs.wait();
                    }
                    catch (Exception e) {
                        LOG.error("producer.close() exception while waiting on outstanding messages " + e);
                    }
                }
            }
        }
        for (int i = 0; i < this.MaxWorkQueues; ++i) {
            try {
                this.workers.get(i).join();
                continue;
            }
            catch (InterruptedException e) {
                throw new InterruptException(e);
            }
        }
        LOG.debug("Calling jni producer close");
        this.CloseProducer(this._clntPtr);
        LOG.debug("Returned from jni producer close");
        this.MaxWorkQueues = 0;
        this._clntPtr = 0L;
    }

    private class WorkerThread
    implements Runnable {
        private WorkQueue wq;
        private int id;
        private static final int MaxSends = 100;
        private int maxBytesToSend = 50000;

        public WorkerThread(WorkQueue wq, int id) {
            this.id = id;
            this.wq = wq;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ArrayList<ProducerRecordJob> flushJobs = new ArrayList<ProducerRecordJob>();
            int[] feedIDs = new int[100];
            int[] byteOffsets = new int[300];
            ProducerRecordJob[] recList = new ProducerRecordJob[100];
            byte[] toSend = new byte[this.maxBytesToSend];
            try {
                while (true) {
                    int recSz;
                    if ((recSz = this.wq.dequeue(100, recList)) == 0) {
                        LOG.debug("Worker thread " + this.id + " joining");
                        return;
                    }
                    LOG.debug("Worker thread " + this.id + " got " + recSz + " jobs");
                    MarlinProducerResult[] results = new MarlinProducerResult[recSz];
                    int numMsgJobs = 0;
                    int byteSize = 0;
                    for (int i = 0; i < recSz; ++i) {
                        ProducerRecordJob rec = recList[i];
                        if (rec.isFlushJob()) {
                            flushJobs.add(rec);
                            continue;
                        }
                        feedIDs[numMsgJobs] = rec.getResult().getFeed();
                        results[numMsgJobs] = rec.getResult();
                        byteOffsets[numMsgJobs * 3] = byteSize += rec.getTopic().length;
                        if (rec.getKey() != null) {
                            byteSize += rec.getKey().length;
                        }
                        byteOffsets[numMsgJobs * 3 + 1] = byteSize;
                        byteOffsets[numMsgJobs * 3 + 2] = byteSize += rec.getValue().length;
                        ++numMsgJobs;
                    }
                    if (this.maxBytesToSend < byteSize) {
                        LOG.debug("Worker thread increasing byte array from " + this.maxBytesToSend + " to " + byteSize);
                        toSend = new byte[byteSize];
                        this.maxBytesToSend = byteSize;
                    }
                    ByteBuffer toSendBuffer = ByteBuffer.wrap(toSend);
                    for (int i = 0; i < recSz; ++i) {
                        ProducerRecordJob rec = recList[i];
                        if (rec.isFlushJob()) continue;
                        toSendBuffer.put(rec.getTopic());
                        if (rec.getKey() != null) {
                            toSendBuffer.put(rec.getKey());
                        }
                        toSendBuffer.put(rec.getValue());
                    }
                    assert (recSz == numMsgJobs + flushJobs.size());
                    if (numMsgJobs > 0) {
                        try {
                            int err = MarlinProducerImpl.this.Send(MarlinProducerImpl.this._clntPtr, MarlinProducerImpl.this.producerId, toSend, byteSize, byteOffsets, feedIDs, results, numMsgJobs);
                            if (err != 0) {
                                throw new KafkaException("could not send to MarlinProducer");
                            }
                        }
                        catch (ApiException e) {
                            LOG.error("Exception occurred during message send:", (Throwable)e);
                            for (int i = 0; i < recSz; ++i) {
                                if (recList[i].isFlushJob()) continue;
                                MarlinProducerResult result = recList[i].getResult();
                                result.done(result.getTopic(), result.getFeed(), -1L, (RuntimeException)((Object)e));
                                if (result.callback() == null) continue;
                                result.callback().onCompletion(result.getRecordMetadata(), (Exception)result.error());
                            }
                        }
                    }
                    for (int i = 0; i < flushJobs.size(); ++i) {
                        ProducerRecordJob notifyJob = (ProducerRecordJob)flushJobs.get(i);
                        notifyJob.markFlushJobDone();
                        ProducerRecordJob producerRecordJob = notifyJob;
                        synchronized (producerRecordJob) {
                            notifyJob.notify();
                            continue;
                        }
                    }
                    flushJobs.clear();
                }
            }
            catch (Exception e) {
                LOG.error(e.getMessage(), (Throwable)e);
                return;
            }
        }
    }

    private class WorkQueue {
        private ProducerRecordJob[] precords;
        private int capacity;
        private int tailIdx;
        private int headIdx;
        private boolean shuttingdown;

        public WorkQueue(int maxSz) {
            this.capacity = maxSz;
            this.precords = new ProducerRecordJob[this.capacity];
            this.headIdx = 0;
            this.tailIdx = 0;
            this.shuttingdown = false;
        }

        public synchronized void shutdown() {
            this.shuttingdown = true;
            this.notifyAll();
        }

        private boolean isEmpty() {
            return this.tailIdx == this.headIdx;
        }

        private boolean isFull() {
            return this.headIdx == this.tailIdx - 1 || this.headIdx == this.capacity - 1 && this.tailIdx == 0;
        }

        private int size() {
            if (this.isEmpty()) {
                return 0;
            }
            if (this.tailIdx < this.headIdx) {
                return this.headIdx - this.tailIdx;
            }
            return this.capacity - this.tailIdx + this.headIdx;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void enqueueFlushAndWait() {
            ProducerRecordJob flushJob;
            ProducerRecordJob producerRecordJob = flushJob = new ProducerRecordJob();
            synchronized (producerRecordJob) {
                this.enqueue(flushJob);
                try {
                    while (!flushJob.getFlushJobDone()) {
                        flushJob.wait();
                    }
                }
                catch (Exception e) {
                    LOG.error("enqueueFlushAndWait flushJob.wait() encountered " + e);
                }
            }
        }

        public synchronized void enqueue(ProducerRecordJob rec) {
            if (this.shuttingdown) {
                LOG.error("enqueue job failed:  producer shutting down");
                throw new RuntimeException("enqueue job failed:  producer shutting down");
            }
            while (this.isFull()) {
                try {
                    this.wait();
                }
                catch (Exception e) {
                    LOG.error("enqueue encountered " + e);
                }
            }
            this.precords[this.headIdx] = rec;
            this.headIdx = (this.headIdx + 1) % this.capacity;
            assert (this.headIdx != this.tailIdx);
            this.notify();
        }

        private ProducerRecordJob dequeueInternal() {
            assert (!this.isEmpty());
            ProducerRecordJob rec = this.precords[this.tailIdx];
            this.precords[this.tailIdx] = null;
            this.tailIdx = (this.tailIdx + 1) % this.capacity;
            return rec;
        }

        public synchronized int dequeue(int maxSz, ProducerRecordJob[] retList) {
            while (this.isEmpty() && !this.shuttingdown) {
                try {
                    this.wait();
                }
                catch (Exception e) {}
            }
            int retSz = this.size();
            assert (this.shuttingdown || retSz > 0);
            if (retSz > maxSz) {
                retSz = maxSz;
            }
            if (retSz > 0) {
                int idx;
                for (idx = 0; idx < retSz; ++idx) {
                    retList[idx] = this.dequeueInternal();
                }
                assert (idx == retSz);
                this.notify();
            }
            return retSz;
        }
    }

    private class ProducerRecordJob {
        private boolean flushJob;
        private boolean flushJobDone;
        private MarlinProducerResult result;
        private byte[] topic;
        private byte[] key;
        private byte[] value;

        public ProducerRecordJob(MarlinProducerResult res, byte[] k, byte[] v) {
            this.flushJob = false;
            this.flushJobDone = false;
            this.result = res;
            this.topic = res.getTopic().getBytes();
            this.key = k;
            this.value = v;
        }

        public ProducerRecordJob() {
            this.flushJob = true;
            this.flushJobDone = false;
        }

        public boolean isFlushJob() {
            return this.flushJob;
        }

        public boolean getFlushJobDone() {
            return this.flushJobDone;
        }

        public void markFlushJobDone() {
            this.flushJobDone = true;
        }

        public byte[] getTopic() {
            return this.topic;
        }

        public byte[] getKey() {
            return this.key;
        }

        public byte[] getValue() {
            return this.value;
        }

        public MarlinProducerResult getResult() {
            return this.result;
        }
    }

    private static class FutureFailure
    implements Future<RecordMetadata> {
        private final ExecutionException exception;

        public FutureFailure(Exception exception) {
            this.exception = new ExecutionException(exception);
        }

        @Override
        public boolean cancel(boolean interrupt) {
            return false;
        }

        @Override
        public RecordMetadata get() throws ExecutionException {
            throw this.exception;
        }

        @Override
        public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
            throw this.exception;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }
    }
}

