package voldemort.server.protocol.admin;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.client.protocol.admin.filter.DefaultVoldemortFilter;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.StreamRequestHandler;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Versioned;

/* loaded from: input_file:voldemort/server/protocol/admin/UpdatePartitionEntriesStreamRequestHandler.class */
public class UpdatePartitionEntriesStreamRequestHandler implements StreamRequestHandler {
    private VAdminProto.UpdatePartitionEntriesRequest request;
    private final ErrorCodeMapper errorCodeMapper;
    private final EventThrottler throttler;
    private final VoldemortFilter filter;
    private final StorageEngine<ByteArray, byte[]> storageEngine;
    private int counter;
    private final long startTime;
    private final VAdminProto.UpdatePartitionEntriesResponse.Builder responseBuilder = VAdminProto.UpdatePartitionEntriesResponse.newBuilder();
    private final Logger logger = Logger.getLogger(getClass());

    public UpdatePartitionEntriesStreamRequestHandler(VAdminProto.UpdatePartitionEntriesRequest updatePartitionEntriesRequest, ErrorCodeMapper errorCodeMapper, VoldemortConfig voldemortConfig, StoreRepository storeRepository, NetworkClassLoader networkClassLoader) {
        this.request = updatePartitionEntriesRequest;
        this.errorCodeMapper = errorCodeMapper;
        this.storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository, updatePartitionEntriesRequest.getStore());
        this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
        this.filter = updatePartitionEntriesRequest.hasFilter() ? AdminServiceRequestHandler.getFilterFromRequest(updatePartitionEntriesRequest.getFilter(), voldemortConfig, networkClassLoader) : new DefaultVoldemortFilter();
        this.startTime = System.currentTimeMillis();
    }

    @Override // voldemort.server.protocol.StreamRequestHandler
    public StreamRequestHandler.StreamRequestHandlerState handleRequest(DataInputStream dataInputStream, DataOutputStream dataOutputStream) throws IOException {
        if (this.request == null) {
            try {
                int readInt = dataInputStream.readInt();
                if (readInt == -1) {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Message size -1, completed partition update");
                    }
                    return StreamRequestHandler.StreamRequestHandlerState.COMPLETE;
                }
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("UpdatePartitionEntriesRequest message size: " + readInt);
                }
                byte[] bArr = new byte[readInt];
                try {
                    ByteUtils.read(dataInputStream, bArr);
                    VAdminProto.UpdatePartitionEntriesRequest.Builder newBuilder = VAdminProto.UpdatePartitionEntriesRequest.newBuilder();
                    newBuilder.mergeFrom(bArr);
                    this.request = newBuilder.m1040build();
                } catch (EOFException e) {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Incomplete read for message");
                    }
                    return StreamRequestHandler.StreamRequestHandlerState.INCOMPLETE_READ;
                }
            } catch (EOFException e2) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Incomplete read for message size");
                }
                return StreamRequestHandler.StreamRequestHandlerState.INCOMPLETE_READ;
            }
        }
        VAdminProto.PartitionEntry partitionEntry = this.request.getPartitionEntry();
        ByteArray decodeBytes = ProtoUtils.decodeBytes(partitionEntry.getKey());
        Versioned<byte[]> decodeVersioned = ProtoUtils.decodeVersioned(partitionEntry.getVersioned());
        if (this.filter.accept(decodeBytes, decodeVersioned)) {
            try {
                this.storageEngine.put(decodeBytes, decodeVersioned);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("updateEntries (Streaming put) successful");
                }
            } catch (ObsoleteVersionException e3) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("updateEntries (Streaming put) threw ObsoleteVersionException, Ignoring.");
                }
            }
            this.throttler.maybeThrottle(decodeBytes.length() + AdminServiceRequestHandler.valueSize(decodeVersioned));
        }
        this.counter++;
        if (0 == this.counter % 100000) {
            long currentTimeMillis = (System.currentTimeMillis() - this.startTime) / 1000;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("updateEntries() updated " + this.counter + " entries for store:" + this.storageEngine.getName() + " in " + currentTimeMillis + " s");
            }
        }
        this.request = null;
        return StreamRequestHandler.StreamRequestHandlerState.READING;
    }

    @Override // voldemort.server.protocol.StreamRequestHandler
    public StreamRequestHandler.StreamRequestDirection getDirection() {
        return StreamRequestHandler.StreamRequestDirection.READING;
    }

    @Override // voldemort.server.protocol.StreamRequestHandler
    public void close(DataOutputStream dataOutputStream) throws IOException {
        ProtoUtils.writeMessage(dataOutputStream, this.responseBuilder.m1069build());
    }

    @Override // voldemort.server.protocol.StreamRequestHandler
    public void handleError(DataOutputStream dataOutputStream, VoldemortException voldemortException) throws IOException {
        this.responseBuilder.setError(ProtoUtils.encodeError(this.errorCodeMapper, voldemortException));
        if (this.logger.isEnabledFor(Level.ERROR)) {
            this.logger.error("handleUpdatePartitionEntries failed for request(" + this.request + ")", voldemortException);
        }
    }
}
