package com.mapr.streams.impl.producer;

import com.mapr.fs.jni.MarlinJniProducer;
import com.mapr.fs.jni.MarlinProducerResult;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.impl.MarlinClient;
import com.mapr.streams.impl.MarlinTopicInfo;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.StreamsPartitioner;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mapr/streams/impl/producer/MarlinProducerImpl.class */
public class MarlinProducerImpl extends MarlinJniProducer {
    private static final int MAX_WORK_QUEUE_SIZE = 4;
    private static final int SEND_BATCH_SIZE = 100;
    private int MaxWorkQueues;
    private List<WorkQueue> workQueues;
    private List<Thread> workers;
    private boolean multiFlushers;
    private boolean shuttingdown;
    private final AtomicLong counter = new AtomicLong(0);
    private final AtomicLong outstandingMsgs = new AtomicLong(0);
    private Map<String, Integer> topicNameToNumPartitions;
    private StreamsPartitioner partitioner;
    private long producerId;
    private String defaultClusterPath;
    private String defaultStreamName;
    private static final Logger LOG = LoggerFactory.getLogger(MarlinProducerImpl.class);
    private static final AtomicLong curProducerId = new AtomicLong(1);

    /* loaded from: input_file:com/mapr/streams/impl/producer/MarlinProducerImpl$FutureFailure.class */
    private static class FutureFailure implements Future<RecordMetadata> {
        private final ExecutionException exception;

        public FutureFailure(Exception exc) {
            this.exception = new ExecutionException(exc);
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return false;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public RecordMetadata get() throws ExecutionException {
            throw this.exception;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public RecordMetadata get(long j, TimeUnit timeUnit) throws ExecutionException {
            throw this.exception;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/streams/impl/producer/MarlinProducerImpl$ProducerRecordJob.class */
    public class ProducerRecordJob {
        private boolean flushJob = false;
        private boolean flushJobDone = false;
        private MarlinProducerResultImpl result;
        private byte[] topic;
        private byte[] key;
        private byte[] value;

        public ProducerRecordJob(MarlinProducerResultImpl marlinProducerResultImpl, byte[] bArr, byte[] bArr2) {
            this.result = marlinProducerResultImpl;
            this.key = bArr;
            this.value = bArr2;
        }

        public ProducerRecordJob() {
        }

        public boolean isFlushJob() {
            return this.flushJob;
        }

        public boolean getFlushJobDone() {
            return this.flushJobDone;
        }

        public void markFlushJobDone() {
            this.flushJobDone = true;
        }

        public byte[] getTopic() {
            if (this.topic == null && !this.flushJob) {
                this.topic = this.result.getTopic().getBytes();
            }
            return this.topic;
        }

        public byte[] getKey() {
            return this.key;
        }

        public byte[] getValue() {
            return this.value;
        }

        public MarlinProducerResultImpl getResult() {
            return this.result;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/streams/impl/producer/MarlinProducerImpl$WorkQueue.class */
    public class WorkQueue {
        private ProducerRecordJob[] precords;
        private int capacity;
        static final /* synthetic */ boolean $assertionsDisabled;
        private int headIdx = 0;
        private int tailIdx = 0;
        private boolean shuttingdown = false;
        private boolean notifyOnDequeue = false;
        private boolean notifyOnEnqueue = false;

        public WorkQueue(int i) {
            this.capacity = i;
            this.precords = new ProducerRecordJob[this.capacity];
        }

        public synchronized void shutdown() {
            this.shuttingdown = true;
            notifyAll();
        }

        private boolean isEmpty() {
            return this.tailIdx == this.headIdx;
        }

        private boolean isFull() {
            return this.headIdx == this.tailIdx - 1 || (this.headIdx == this.capacity - 1 && this.tailIdx == 0);
        }

        private int size() {
            if (isEmpty()) {
                return 0;
            }
            return this.tailIdx < this.headIdx ? this.headIdx - this.tailIdx : (this.capacity - this.tailIdx) + this.headIdx;
        }

        public void enqueueFlushAndWait() {
            ProducerRecordJob producerRecordJob = new ProducerRecordJob();
            synchronized (producerRecordJob) {
                enqueue(producerRecordJob);
                while (!producerRecordJob.getFlushJobDone()) {
                    try {
                        producerRecordJob.wait();
                    } catch (Exception e) {
                        MarlinProducerImpl.LOG.error("enqueueFlushAndWait flushJob.wait() encountered " + e);
                    }
                }
            }
        }

        public synchronized void enqueue(ProducerRecordJob producerRecordJob) {
            if (this.shuttingdown) {
                MarlinProducerImpl.LOG.error("enqueue job failed:  producer shutting down");
                throw new RuntimeException("enqueue job failed:  producer shutting down");
            }
            while (isFull()) {
                try {
                    this.notifyOnDequeue = true;
                    wait();
                } catch (Exception e) {
                    MarlinProducerImpl.LOG.error("enqueue encountered " + e);
                }
            }
            this.notifyOnDequeue = false;
            this.precords[this.headIdx] = producerRecordJob;
            this.headIdx = (this.headIdx + 1) % this.capacity;
            if (!$assertionsDisabled && this.headIdx == this.tailIdx) {
                throw new AssertionError();
            }
            if (this.notifyOnEnqueue) {
                notify();
            }
        }

        private ProducerRecordJob dequeueInternal() {
            if (!$assertionsDisabled && isEmpty()) {
                throw new AssertionError();
            }
            ProducerRecordJob producerRecordJob = this.precords[this.tailIdx];
            this.precords[this.tailIdx] = null;
            this.tailIdx = (this.tailIdx + 1) % this.capacity;
            return producerRecordJob;
        }

        public synchronized int dequeue(ProducerRecordJob[] producerRecordJobArr) {
            while (isEmpty() && !this.shuttingdown) {
                try {
                    this.notifyOnEnqueue = true;
                    wait();
                } catch (Exception e) {
                }
            }
            this.notifyOnEnqueue = false;
            if (size() < MarlinProducerImpl.SEND_BATCH_SIZE && !this.shuttingdown) {
                try {
                    wait(1L);
                } catch (Exception e2) {
                }
            }
            int size = size();
            if (!$assertionsDisabled && !this.shuttingdown && size <= 0) {
                throw new AssertionError();
            }
            if (size > MarlinProducerImpl.SEND_BATCH_SIZE) {
                size = MarlinProducerImpl.SEND_BATCH_SIZE;
            }
            if (size > 0) {
                int i = 0;
                while (i < size) {
                    producerRecordJobArr[i] = dequeueInternal();
                    i++;
                }
                if (!$assertionsDisabled && i != size) {
                    throw new AssertionError();
                }
                notify();
            }
            return size;
        }

        static {
            $assertionsDisabled = !MarlinProducerImpl.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/mapr/streams/impl/producer/MarlinProducerImpl$WorkerThread.class */
    private class WorkerThread implements Runnable {
        private WorkQueue wq;
        private int id;
        private int maxBytesToSend = 50000;
        static final /* synthetic */ boolean $assertionsDisabled;

        public WorkerThread(WorkQueue workQueue, int i) {
            this.id = i;
            this.wq = workQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            ArrayList arrayList = new ArrayList();
            int[] iArr = new int[MarlinProducerImpl.SEND_BATCH_SIZE];
            int[] iArr2 = new int[400];
            ProducerRecordJob[] producerRecordJobArr = new ProducerRecordJob[MarlinProducerImpl.SEND_BATCH_SIZE];
            byte[] bArr = new byte[this.maxBytesToSend];
            while (true) {
                try {
                    int dequeue = this.wq.dequeue(producerRecordJobArr);
                    if (dequeue == 0) {
                        MarlinProducerImpl.LOG.debug("Worker thread " + this.id + " joining");
                        return;
                    }
                    MarlinProducerImpl.LOG.debug("Worker thread " + this.id + " got " + dequeue + " jobs");
                    MarlinProducerResultImpl[] marlinProducerResultImplArr = new MarlinProducerResultImpl[dequeue];
                    int i = 0;
                    int i2 = 0;
                    for (int i3 = 0; i3 < dequeue; i3++) {
                        ProducerRecordJob producerRecordJob = producerRecordJobArr[i3];
                        if (producerRecordJob.isFlushJob()) {
                            arrayList.add(producerRecordJob);
                        } else {
                            iArr[i] = producerRecordJob.getResult().getFeed();
                            marlinProducerResultImplArr[i] = producerRecordJob.getResult();
                            int length = i2 + producerRecordJob.getTopic().length;
                            iArr2[i * 3] = length;
                            if (producerRecordJob.getKey() != null) {
                                length += producerRecordJob.getKey().length;
                            }
                            iArr2[(i * 3) + 1] = length;
                            i2 = length + producerRecordJob.getValue().length;
                            iArr2[(i * 3) + 2] = i2;
                            i++;
                        }
                    }
                    if (this.maxBytesToSend < i2) {
                        MarlinProducerImpl.LOG.debug("Worker thread increasing byte array from " + this.maxBytesToSend + " to " + i2);
                        bArr = new byte[i2];
                        this.maxBytesToSend = i2;
                    }
                    ByteBuffer wrap = ByteBuffer.wrap(bArr);
                    for (int i4 = 0; i4 < dequeue; i4++) {
                        ProducerRecordJob producerRecordJob2 = producerRecordJobArr[i4];
                        if (!producerRecordJob2.isFlushJob()) {
                            wrap.put(producerRecordJob2.getTopic());
                            if (producerRecordJob2.getKey() != null) {
                                wrap.put(producerRecordJob2.getKey());
                            }
                            wrap.put(producerRecordJob2.getValue());
                        }
                    }
                    if (!$assertionsDisabled && dequeue != i + arrayList.size()) {
                        throw new AssertionError();
                    }
                    if (i > 0) {
                        try {
                            int Send = MarlinProducerImpl.this.Send(MarlinProducerImpl.this._clntPtr, MarlinProducerImpl.this.producerId, bArr, i2, iArr2, iArr, marlinProducerResultImplArr, i);
                            if (Send != 0) {
                                throw MarlinClient.jniErrToException(Send, "could not send to MarlinProducer");
                                break;
                            }
                        } catch (ApiException e) {
                            MarlinProducerImpl.LOG.error("Exception occurred during message send:", e);
                            for (int i5 = 0; i5 < dequeue; i5++) {
                                if (!producerRecordJobArr[i5].isFlushJob()) {
                                    MarlinProducerResultImpl result = producerRecordJobArr[i5].getResult();
                                    result.done(result.getFeed(), -1L, e);
                                    result.onCompletion();
                                }
                            }
                        }
                    }
                    for (int i6 = 0; i6 < arrayList.size(); i6++) {
                        ProducerRecordJob producerRecordJob3 = (ProducerRecordJob) arrayList.get(i6);
                        producerRecordJob3.markFlushJobDone();
                        synchronized (producerRecordJob3) {
                            producerRecordJob3.notify();
                        }
                    }
                    arrayList.clear();
                } catch (Exception e2) {
                    MarlinProducerImpl.LOG.error(e2.getMessage(), e2);
                    return;
                }
            }
        }

        static {
            $assertionsDisabled = !MarlinProducerImpl.class.desiredAssertionStatus();
        }
    }

    private static void checkProducerConfig(ProducerConfig producerConfig) throws KafkaException {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        try {
            producerConfig.getString(defaultInstance.getClientID());
            try {
                producerConfig.getBoolean(defaultInstance.getParallelFlushersPerPartition());
                try {
                    long longValue = producerConfig.getLong(defaultInstance.getBufferMemory()).longValue();
                    if (longValue <= 0) {
                        throw new ConfigException("Buffer memory (" + longValue + ") must be a positive number");
                    }
                    try {
                        long longValue2 = producerConfig.getLong(defaultInstance.getBufferTime()).longValue();
                        if (longValue2 < 0) {
                            throw new ConfigException("buffer ms (" + longValue2 + ") cannot be negative number");
                        }
                        try {
                            long longValue3 = producerConfig.getLong(defaultInstance.getMetadataMaxAge()).longValue();
                            if (longValue3 < 0) {
                                throw new ConfigException("metadata max age (" + longValue3 + ") cannot be negative number");
                            }
                        } catch (ConfigException e) {
                            LOG.error("Invalid metadata max age configuration");
                            throw e;
                        }
                    } catch (ConfigException e2) {
                        LOG.error("Invalid streams buffer time configuration");
                        throw e2;
                    }
                } catch (ConfigException e3) {
                    LOG.error("Invalid buffer memory configuration");
                    throw e3;
                }
            } catch (ConfigException e4) {
                LOG.error("Invalid streams paralle flushers configuration");
                throw e4;
            }
        } catch (ConfigException e5) {
            LOG.error("Invalid client id configuration");
            throw e5;
        }
    }

    public MarlinProducerImpl(ProducerConfig producerConfig) throws KafkaException {
        checkProducerConfig(producerConfig);
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        this.multiFlushers = producerConfig.getBoolean(defaultInstance.getParallelFlushersPerPartition());
        this.partitioner = (StreamsPartitioner) producerConfig.getConfiguredInstance(defaultInstance.getPartitioner(), StreamsPartitioner.class);
        this.defaultStreamName = null;
        int i = 0;
        boolean z = true;
        try {
            this.defaultStreamName = producerConfig.getString(defaultInstance.getProducerDefaultStream());
            i = producerConfig.getInt(defaultInstance.getRpcTimeout()).intValue();
            z = producerConfig.getBoolean(defaultInstance.getHardMount());
        } catch (ConfigException e) {
        }
        this.shuttingdown = false;
        this.MaxWorkQueues = MAX_WORK_QUEUE_SIZE;
        this._clntPtr = OpenProducer(producerConfig.getString(defaultInstance.getClientID()), i, z, producerConfig.getBoolean(defaultInstance.getParallelFlushersPerPartition()), producerConfig.getLong(defaultInstance.getBufferMemory()).longValue(), producerConfig.getLong(defaultInstance.getBufferTime()).longValue(), producerConfig.getLong(defaultInstance.getMetadataMaxAge()).longValue(), this.defaultStreamName);
        this.producerId = curProducerId.getAndIncrement();
        if (this._clntPtr == 0) {
            throw new KafkaException("Could not create MarlinProducer");
        }
        this.defaultClusterPath = GetDefaultClusterPath(this._clntPtr);
        this.workQueues = new ArrayList();
        this.workers = new ArrayList();
        for (int i2 = 0; i2 < this.MaxWorkQueues; i2++) {
            WorkQueue workQueue = new WorkQueue(1024);
            this.workQueues.add(workQueue);
            Thread thread = new Thread(new WorkerThread(workQueue, i2));
            this.workers.add(thread);
            thread.start();
        }
        this.topicNameToNumPartitions = new ConcurrentHashMap();
    }

    public Future<RecordMetadata> send(String str, int i, Object obj, byte[] bArr, Object obj2, byte[] bArr2, Callback callback) throws KafkaException {
        int length;
        if (this.shuttingdown) {
            LOG.error("Cannot send message when producer is shutting down.");
            if (callback != null) {
                callback.onCompletion(new RecordMetadata(new TopicPartition(str, -1), -1L, 0L), new RuntimeException("Producer is shutting down. Cannot send."));
            }
            return new FutureFailure(new RuntimeException("Producer is shutting down. Cannot send."));
        }
        if (bArr2 == null || bArr2.length == 0) {
            LOG.error("Cannot send message without any value");
            if (callback != null) {
                callback.onCompletion(new RecordMetadata(new TopicPartition(str, -1), -1L, 0L), new IllegalArgumentException("Value is empty"));
            }
            return new FutureFailure(new IllegalArgumentException("Value is empty"));
        }
        if (i == -1) {
            if (str.startsWith(this.defaultClusterPath)) {
                str = str.substring(this.defaultClusterPath.length());
            }
            try {
                i = this.partitioner.partition(str, obj, bArr, obj2, bArr2, getNumPartitions(str));
            } catch (KafkaException e) {
                LOG.error("Cannot send message, got an error while fetching number of partitions " + e);
                if (callback != null) {
                    callback.onCompletion(new RecordMetadata(new TopicPartition(str, -1), -1L, 0L), new IllegalArgumentException(e.toString()));
                }
                return new FutureFailure(new IllegalArgumentException(e.toString()));
            }
        }
        MarlinProducerResultImpl marlinProducerResultImpl = new MarlinProducerResultImpl(str, i, callback);
        ProducerRecordJob producerRecordJob = new ProducerRecordJob(marlinProducerResultImpl, bArr, bArr2);
        if (this.multiFlushers && i == -1 && bArr == null) {
            length = (int) (0 + this.counter.getAndIncrement());
        } else {
            length = 0 + str.length();
            if (i > 0) {
                length += i;
            }
        }
        try {
            this.workQueues.get(length % this.MaxWorkQueues).enqueue(producerRecordJob);
            this.outstandingMsgs.getAndIncrement();
            return new MarlinFuture(marlinProducerResultImpl);
        } catch (Exception e2) {
            LOG.error(e2.getMessage(), e2);
            return new FutureFailure(e2);
        }
    }

    public void flush() throws KafkaException {
        for (int i = 0; i < this.MaxWorkQueues; i++) {
            this.workQueues.get(i).enqueueFlushAndWait();
        }
        int Flush = Flush(this._clntPtr);
        if (Flush != 0) {
            throw MarlinClient.jniErrToException(Flush, "could not flush to MarlinProducer");
        }
    }

    public void handleJniCallbacks(int i, long[] jArr, MarlinProducerResult[] marlinProducerResultArr, int i2, int i3) {
        LOG.debug("Handling callback for " + i);
        for (int i4 = 0; i4 < i; i4++) {
            try {
                MarlinProducerResult marlinProducerResult = marlinProducerResultArr[i4];
                if (marlinProducerResult != null) {
                    marlinProducerResult.done(i2, jArr[i4], MarlinClient.jniErrToException(i3, null));
                    marlinProducerResult.onCompletion();
                } else {
                    LOG.error("producer.handleJniCallbacks() got null as result");
                }
            } catch (Throwable th) {
                LOG.error("Hitting exception during handleJniCallbacks");
                LOG.error("Exception " + th);
                return;
            }
        }
        long andAdd = this.outstandingMsgs.getAndAdd((-1) * i);
        if (andAdd < i) {
            LOG.error("outstandingMsgs are " + andAdd + " but we got results for " + i);
        }
        if (this.outstandingMsgs.get() == 0) {
            synchronized (this.outstandingMsgs) {
                if (this.outstandingMsgs.get() == 0) {
                    try {
                        this.outstandingMsgs.notifyAll();
                    } catch (Exception e) {
                        LOG.error("producer.handleJniCallbacks() exception while closing " + e);
                    }
                }
            }
        }
    }

    public void handleJniTopicMetadata(String str, int[] iArr, int[] iArr2, int i) {
        LOG.debug("Handling topic metadata changes for " + i + " topics");
        int i2 = 0;
        for (int i3 = 0; i3 < i; i3++) {
            try {
                String substring = str.substring(i2, i2 + iArr[i3]);
                i2 += iArr[i3];
                if (iArr2[i3] == 0) {
                    LOG.debug("Removing topic metadata for " + substring);
                    this.topicNameToNumPartitions.remove(substring);
                } else {
                    LOG.debug("Adding topic metadata for " + substring + " with " + iArr2[i3] + " partitions");
                    this.topicNameToNumPartitions.put(substring, Integer.valueOf(iArr2[i3]));
                }
            } catch (Throwable th) {
                LOG.error("Hitting exception during handleJniTopicMetadata");
                LOG.error("Exception " + th);
                return;
            }
        }
    }

    private int getNumPartitions(String str) throws KafkaException {
        Integer num = this.topicNameToNumPartitions.get(str);
        if (num == null) {
            num = Integer.valueOf(GetTopicInfo(this._clntPtr, str));
            if (num.intValue() < 0) {
                throw new UnknownTopicOrPartitionException("could not get TopicInfo, err " + (-num.intValue()));
            }
            this.topicNameToNumPartitions.put(str, num);
        }
        return num.intValue();
    }

    public List<PartitionInfo> getTopicInfo(String str) throws KafkaException {
        return new MarlinTopicInfo(str, getNumPartitions(str)).getKafkaPartitionInfo();
    }

    public void close(long j, TimeUnit timeUnit) {
        LOG.debug("Producer is shutting down");
        synchronized (this) {
            if (this.shuttingdown) {
                LOG.debug("Producer shutdown already in progress");
                return;
            }
            this.shuttingdown = true;
            LOG.debug("Producer actually shutting down");
            flush();
            for (int i = 0; i < this.MaxWorkQueues; i++) {
                this.workQueues.get(i).shutdown();
            }
            if (this.outstandingMsgs.get() != 0) {
                synchronized (this.outstandingMsgs) {
                    if (this.outstandingMsgs.get() != 0) {
                        try {
                            this.outstandingMsgs.wait();
                        } catch (Exception e) {
                            LOG.error("producer.close() exception while waiting on outstanding messages " + e);
                        }
                    }
                }
            }
            for (int i2 = 0; i2 < this.MaxWorkQueues; i2++) {
                try {
                    this.workers.get(i2).join();
                } catch (InterruptedException e2) {
                    throw new InterruptException(e2);
                }
            }
            LOG.debug("Calling jni producer close");
            CloseProducer(this._clntPtr);
            LOG.debug("Returned from jni producer close");
            this.MaxWorkQueues = 0;
            this._clntPtr = 0L;
        }
    }
}
