Class CommitProcessor

  • All Implemented Interfaces:
    java.lang.Runnable, RequestProcessor

    public class CommitProcessor
    extends ZooKeeperCriticalThread
    implements RequestProcessor
    This RequestProcessor matches the incoming committed requests with the locally submitted requests. The trick is that locally submitted requests that change the state of the system will come back as incoming committed requests, so we need to match them up. The CommitProcessor is multi-threaded. Communication between threads is handled via queues, atomics, and wait/notifyAll synchronized on the processor. The CommitProcessor acts as a gateway for allowing requests to continue with the remainder of the processing pipeline. It will allow many read requests but only a single write request to be in flight simultaneously, thus ensuring that write requests are processed in transaction id order. - 1 commit processor main thread, which watches the request queues and assigns requests to worker threads based on their sessionId so that read and write requests for a particular session are always assigned to the same thread (and hence are guaranteed to run in order). - 0-N worker threads, which run the rest of the request processor pipeline on the requests. If configured with 0 worker threads, the primary commit processor thread runs the pipeline directly. Typical (default) thread counts are: on a 32 core machine, 1 commit processor thread and 32 worker threads. Multi-threading constraints: - Each session's requests must be processed in order. - Write requests must be processed in zxid order - Must ensure no race condition between writes in one session that would trigger a watch being set by a read request in another session The current implementation solves the third constraint by simply allowing no read requests to be processed in parallel with write requests.
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void commit​(Request request)  
      protected boolean needCommit​(Request request)  
      protected void processCommitted()  
      void processRequest​(Request request)  
      void run()  
      void shutdown()  
      void start()  
      • Methods inherited from class java.lang.Thread

        activeCount, checkAccess, clone, countStackFrames, currentThread, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, onSpinWait, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, stop, suspend, toString, yield
      • Methods inherited from class java.lang.Object

        equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
    • Field Detail

      • ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS

        public static final java.lang.String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS
        Default: numCores
        See Also:
        Constant Field Values
      • ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT

        public static final java.lang.String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT
        Default worker pool shutdown timeout in ms: 5000 (5s)
        See Also:
        Constant Field Values
      • queuedRequests

        protected final java.util.concurrent.LinkedBlockingQueue<Request> queuedRequests
        Requests that we are holding until the commit comes in.
      • committedRequests

        protected final java.util.concurrent.LinkedBlockingQueue<Request> committedRequests
        Requests that have been committed.
      • nextPending

        protected final java.util.concurrent.atomic.AtomicReference<Request> nextPending
        Request for which we are currently awaiting a commit
      • numRequestsProcessing

        protected java.util.concurrent.atomic.AtomicInteger numRequestsProcessing
        The number of requests currently being processed
      • stopped

        protected volatile boolean stopped
    • Method Detail

      • needCommit

        protected boolean needCommit​(Request request)
      • run

        public void run()
        Specified by:
        run in interface java.lang.Runnable
        Overrides:
        run in class java.lang.Thread
      • processCommitted

        protected void processCommitted()
      • start

        public void start()
        Overrides:
        start in class java.lang.Thread
      • commit

        public void commit​(Request request)