package org.apache.zookeeper.server.quorum;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.SessionTracker;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.LocalSessionRequestTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorTest.class */
public class CommitProcessorTest extends ZKTestCase {
    protected static final Logger LOG = LoggerFactory.getLogger(CommitProcessorTest.class);
    TestZooKeeperServer zks;
    File tmpDir;
    private AtomicInteger processedReadRequests = new AtomicInteger(0);
    private AtomicInteger processedWriteRequests = new AtomicInteger(0);
    ArrayList<TestClientThread> testClients = new ArrayList<>();
    volatile boolean fail = false;

    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorTest$MockProposalRequestProcessor.class */
    private class MockProposalRequestProcessor extends Thread implements RequestProcessor {
        private final CommitProcessor commitProcessor;
        private final LinkedBlockingQueue<Request> proposals = new LinkedBlockingQueue<>();

        public MockProposalRequestProcessor(CommitProcessor commitProcessor) {
            this.commitProcessor = commitProcessor;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Random random = new Random(Thread.currentThread().getId());
            while (true) {
                try {
                    Request take = this.proposals.take();
                    Thread.sleep(10 + random.nextInt(190));
                    this.commitProcessor.commit(take);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }

        public void processRequest(Request request) throws RequestProcessor.RequestProcessorException {
            this.commitProcessor.processRequest(request);
            if (request.getHdr() != null) {
                this.proposals.add(request);
            }
        }

        public void shutdown() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorTest$TestClientThread.class */
    public class TestClientThread extends Thread {
        long sessionId;
        int cxid;
        int nodeId;

        public TestClientThread() {
            this.sessionId = CommitProcessorTest.this.zks.getSessionTracker().createSession(QuorumPeerTestBase.TIMEOUT);
        }

        public void sendWriteRequest() throws Exception {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            BinaryOutputArchive archive = BinaryOutputArchive.getArchive(byteArrayOutputStream);
            String hexString = Long.toHexString(this.sessionId);
            int i = this.nodeId + 1;
            this.nodeId = i;
            new CreateRequest("/session" + hexString + "-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 1).serialize(archive, "request");
            ByteBuffer wrap = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
            long j = this.sessionId;
            int i2 = this.cxid + 1;
            this.cxid = i2;
            CommitProcessorTest.this.zks.firstProcessor.processRequest(new Request((ServerCnxn) null, j, i2, 1, wrap, new ArrayList()));
        }

        public void sendReadRequest() throws Exception {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            new GetDataRequest("/session" + Long.toHexString(this.sessionId) + "-" + this.nodeId, false).serialize(BinaryOutputArchive.getArchive(byteArrayOutputStream), "request");
            ByteBuffer wrap = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
            long j = this.sessionId;
            int i = this.cxid + 1;
            this.cxid = i;
            CommitProcessorTest.this.zks.firstProcessor.processRequest(new Request((ServerCnxn) null, j, i, 4, wrap, new ArrayList()));
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Random random = new Random(Thread.currentThread().getId());
            try {
                sendWriteRequest();
                for (int i = 0; i < 1000; i++) {
                    if (random.nextInt(100) < 25) {
                        sendWriteRequest();
                    } else {
                        sendReadRequest();
                    }
                }
            } catch (Exception e) {
                CommitProcessorTest.LOG.error("Uncaught exception in test: ", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorTest$TestZooKeeperServer.class */
    public class TestZooKeeperServer extends ZooKeeperServer {
        PrepRequestProcessor firstProcessor;
        CommitProcessor commitProcessor;

        public TestZooKeeperServer(File file, File file2, int i) throws IOException {
            super(file, file2, i);
        }

        public SessionTracker getSessionTracker() {
            return this.sessionTracker;
        }

        protected void setupRequestProcessors() {
            ValidateProcessor validateProcessor = new ValidateProcessor(new FinalRequestProcessor(CommitProcessorTest.this.zks));
            this.commitProcessor = new CommitProcessor(validateProcessor, "1", true, getZooKeeperServerListener());
            validateProcessor.setCommitProcessor(this.commitProcessor);
            this.commitProcessor.start();
            MockProposalRequestProcessor mockProposalRequestProcessor = new MockProposalRequestProcessor(this.commitProcessor);
            mockProposalRequestProcessor.start();
            this.firstProcessor = new PrepRequestProcessor(CommitProcessorTest.this.zks, mockProposalRequestProcessor);
            this.firstProcessor.start();
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorTest$ValidateProcessor.class */
    private class ValidateProcessor implements RequestProcessor {
        RequestProcessor nextProcessor;
        CommitProcessor commitProcessor;
        Random rand = new Random(Thread.currentThread().getId());
        AtomicLong expectedZxid = new AtomicLong(1);
        ConcurrentHashMap<Long, AtomicInteger> cxidMap = new ConcurrentHashMap<>();
        AtomicInteger outstandingReadRequests = new AtomicInteger(0);
        AtomicInteger outstandingWriteRequests = new AtomicInteger(0);

        public ValidateProcessor(RequestProcessor requestProcessor) {
            this.nextProcessor = requestProcessor;
        }

        public void setCommitProcessor(CommitProcessor commitProcessor) {
            this.commitProcessor = commitProcessor;
        }

        public void processRequest(Request request) throws RequestProcessor.RequestProcessorException {
            boolean needCommit = this.commitProcessor.needCommit(request);
            if (needCommit) {
                this.outstandingWriteRequests.incrementAndGet();
                validateWriteRequestVariant(request);
                CommitProcessorTest.LOG.debug("Starting write request zxid=" + request.zxid);
            } else {
                CommitProcessorTest.LOG.debug("Starting read request cxid=" + request.cxid + " for session 0x" + Long.toHexString(request.sessionId));
                this.outstandingReadRequests.incrementAndGet();
                validateReadRequestVariant(request);
            }
            try {
                Thread.sleep(10 + this.rand.nextInt(290));
            } catch (InterruptedException e) {
            }
            this.nextProcessor.processRequest(request);
            if (needCommit) {
                this.outstandingWriteRequests.decrementAndGet();
                CommitProcessorTest.LOG.debug("Done write request zxid=" + request.zxid);
                CommitProcessorTest.this.processedWriteRequests.incrementAndGet();
            } else {
                this.outstandingReadRequests.decrementAndGet();
                CommitProcessorTest.LOG.debug("Done read request cxid=" + request.cxid + " for session 0x" + Long.toHexString(request.sessionId));
                CommitProcessorTest.this.processedReadRequests.incrementAndGet();
            }
            validateRequest(request);
        }

        private void validateWriteRequestVariant(Request request) {
            long zxid = request.getHdr().getZxid();
            int i = this.outstandingReadRequests.get();
            if (i != 0) {
                CommitProcessorTest.this.failTest("There are " + i + " outstanding read requests while issuing a write request zxid=" + zxid);
            }
            int i2 = this.outstandingWriteRequests.get();
            if (i2 > 1) {
                CommitProcessorTest.this.failTest("There are " + i2 + " outstanding write requests while issuing a write request zxid=" + zxid + " (expected one)");
            }
        }

        private void validateReadRequestVariant(Request request) {
            int i = this.outstandingWriteRequests.get();
            if (i != 0) {
                CommitProcessorTest.this.failTest("There are " + i + " outstanding write requests while issuing a read request cxid=" + request.cxid + " for session 0x" + Long.toHexString(request.sessionId));
            }
        }

        private void validateRequest(Request request) {
            CommitProcessorTest.LOG.info("Got request " + request);
            if (request.getHdr() != null) {
                long zxid = request.getHdr().getZxid();
                if (!this.expectedZxid.compareAndSet(zxid, zxid + 1)) {
                    CommitProcessorTest commitProcessorTest = CommitProcessorTest.this;
                    commitProcessorTest.failTest("Write request, expected_zxid=" + this.expectedZxid.get() + "; req_zxid=" + commitProcessorTest);
                }
            }
            AtomicInteger atomicInteger = this.cxidMap.get(Long.valueOf(request.sessionId));
            if (atomicInteger != null) {
                if (atomicInteger.compareAndSet(request.cxid, request.cxid + 1)) {
                    return;
                }
                CommitProcessorTest.this.failTest("Expected_cxid=" + atomicInteger.get() + "; req_cxid=" + request.cxid);
            } else {
                AtomicInteger putIfAbsent = this.cxidMap.putIfAbsent(Long.valueOf(request.sessionId), new AtomicInteger(request.cxid + 1));
                if (putIfAbsent != null) {
                    CommitProcessorTest.this.failTest("Race condition adding cxid=" + request.cxid + " for session 0x" + Long.toHexString(request.sessionId) + " with other_cxid=" + putIfAbsent.get());
                }
            }
        }

        public void shutdown() {
        }
    }

    public void setUp(int i, int i2) throws Exception {
        System.setProperty("zookeeper.commitProcessor.numWorkerThreads", Integer.toString(i));
        System.setProperty("zookeeper.admin.enableServer", "false");
        this.tmpDir = ClientBase.createTmpDir();
        ClientBase.setupTestEnv();
        this.zks = new TestZooKeeperServer(this.tmpDir, this.tmpDir, LocalSessionRequestTest.CONNECTION_TIMEOUT);
        this.zks.startup();
        for (int i3 = 0; i3 < i2; i3++) {
            TestClientThread testClientThread = new TestClientThread();
            this.testClients.add(testClientThread);
            testClientThread.start();
        }
    }

    @After
    public void tearDown() throws Exception {
        LOG.info("tearDown starting");
        Iterator<TestClientThread> it = this.testClients.iterator();
        while (it.hasNext()) {
            TestClientThread next = it.next();
            next.interrupt();
            next.join();
        }
        this.zks.shutdown();
        if (this.tmpDir != null) {
            Assert.assertTrue("delete " + this.tmpDir.toString(), ClientBase.recursiveDelete(this.tmpDir));
        }
    }

    @Test
    public void testNoCommitWorkers() throws Exception {
        setUp(0, 10);
        synchronized (this) {
            wait(5000L);
        }
        checkProcessedRequest();
        Assert.assertFalse(this.fail);
    }

    @Test
    public void testOneCommitWorker() throws Exception {
        setUp(1, 10);
        synchronized (this) {
            wait(5000L);
        }
        checkProcessedRequest();
        Assert.assertFalse(this.fail);
    }

    @Test
    public void testManyCommitWorkers() throws Exception {
        setUp(10, 10);
        synchronized (this) {
            wait(5000L);
        }
        checkProcessedRequest();
        Assert.assertFalse(this.fail);
    }

    private void checkProcessedRequest() {
        Assert.assertTrue("No read requests processed", this.processedReadRequests.get() > 0);
        Assert.assertTrue("No write requests processed", this.processedWriteRequests.get() > 0);
    }

    private synchronized void failTest(String str) {
        this.fail = true;
        notifyAll();
        Assert.fail(str);
    }
}
