/*
 * Decompiled with CFR 0.152.
 */
package voldemort.server.protocol.admin;

import com.google.protobuf.Message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.client.protocol.admin.filter.DefaultVoldemortFilter;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.StreamRequestHandler;
import voldemort.server.protocol.admin.AdminServiceRequestHandler;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Versioned;

public class UpdatePartitionEntriesStreamRequestHandler
implements StreamRequestHandler {
    private VAdminProto.UpdatePartitionEntriesRequest request;
    private final VAdminProto.UpdatePartitionEntriesResponse.Builder responseBuilder = VAdminProto.UpdatePartitionEntriesResponse.newBuilder();
    private final ErrorCodeMapper errorCodeMapper;
    private final EventThrottler throttler;
    private final VoldemortFilter filter;
    private final StorageEngine<ByteArray, byte[]> storageEngine;
    private int counter;
    private final long startTime;
    private final Logger logger = Logger.getLogger(this.getClass());

    public UpdatePartitionEntriesStreamRequestHandler(VAdminProto.UpdatePartitionEntriesRequest request, ErrorCodeMapper errorCodeMapper, VoldemortConfig voldemortConfig, StoreRepository storeRepository, NetworkClassLoader networkClassLoader) {
        this.request = request;
        this.errorCodeMapper = errorCodeMapper;
        this.storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository, request.getStore());
        this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
        this.filter = request.hasFilter() ? AdminServiceRequestHandler.getFilterFromRequest(request.getFilter(), voldemortConfig, networkClassLoader) : new DefaultVoldemortFilter();
        this.startTime = System.currentTimeMillis();
    }

    public StreamRequestHandler.StreamRequestHandlerState handleRequest(DataInputStream inputStream, DataOutputStream outputStream) throws IOException {
        Versioned<byte[]> value;
        VAdminProto.PartitionEntry partitionEntry;
        ByteArray key;
        if (this.request == null) {
            int size = 0;
            try {
                size = inputStream.readInt();
            }
            catch (EOFException e) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Incomplete read for message size");
                }
                return StreamRequestHandler.StreamRequestHandlerState.INCOMPLETE_READ;
            }
            if (size == -1) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Message size -1, completed partition update");
                }
                return StreamRequestHandler.StreamRequestHandlerState.COMPLETE;
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("UpdatePartitionEntriesRequest message size: " + size);
            }
            byte[] input = new byte[size];
            try {
                ByteUtils.read(inputStream, input);
            }
            catch (EOFException e) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Incomplete read for message");
                }
                return StreamRequestHandler.StreamRequestHandlerState.INCOMPLETE_READ;
            }
            VAdminProto.UpdatePartitionEntriesRequest.Builder builder = VAdminProto.UpdatePartitionEntriesRequest.newBuilder();
            builder.mergeFrom(input);
            this.request = builder.build();
        }
        if (this.filter.accept(key = ProtoUtils.decodeBytes((partitionEntry = this.request.getPartitionEntry()).getKey()), value = ProtoUtils.decodeVersioned(partitionEntry.getVersioned()))) {
            block16: {
                try {
                    this.storageEngine.put(key, value);
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("updateEntries (Streaming put) successful");
                    }
                }
                catch (ObsoleteVersionException e) {
                    if (!this.logger.isDebugEnabled()) break block16;
                    this.logger.debug("updateEntries (Streaming put) threw ObsoleteVersionException, Ignoring.");
                }
            }
            this.throttler.maybeThrottle(key.length() + AdminServiceRequestHandler.valueSize(value));
        }
        ++this.counter;
        if (0 == this.counter % 100000) {
            long totalTime = (System.currentTimeMillis() - this.startTime) / 1000L;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("updateEntries() updated " + this.counter + " entries for store:" + this.storageEngine.getName() + " in " + totalTime + " s");
            }
        }
        this.request = null;
        return StreamRequestHandler.StreamRequestHandlerState.READING;
    }

    public StreamRequestHandler.StreamRequestDirection getDirection() {
        return StreamRequestHandler.StreamRequestDirection.READING;
    }

    public void close(DataOutputStream outputStream) throws IOException {
        ProtoUtils.writeMessage(outputStream, (Message)this.responseBuilder.build());
    }

    public void handleError(DataOutputStream outputStream, VoldemortException e) throws IOException {
        this.responseBuilder.setError(ProtoUtils.encodeError(this.errorCodeMapper, e));
        if (this.logger.isEnabledFor(Level.ERROR)) {
            this.logger.error("handleUpdatePartitionEntries failed for request(" + (Object)((Object)this.request) + ")", e);
        }
    }
}

