Class CommitProcessor
- java.lang.Object
-
- java.lang.Thread
-
- org.apache.zookeeper.server.ZooKeeperThread
-
- org.apache.zookeeper.server.ZooKeeperCriticalThread
-
- org.apache.zookeeper.server.quorum.CommitProcessor
-
- All Implemented Interfaces:
java.lang.Runnable
,RequestProcessor
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor
This RequestProcessor matches the incoming committed requests with the locally submitted requests. The trick is that locally submitted requests that change the state of the system will come back as incoming committed requests, so we need to match them up. The CommitProcessor is multi-threaded. Communication between threads is handled via queues, atomics, and wait/notifyAll synchronized on the processor. The CommitProcessor acts as a gateway for allowing requests to continue with the remainder of the processing pipeline. It will allow many read requests but only a single write request to be in flight simultaneously, thus ensuring that write requests are processed in transaction id order. - 1 commit processor main thread, which watches the request queues and assigns requests to worker threads based on their sessionId so that read and write requests for a particular session are always assigned to the same thread (and hence are guaranteed to run in order). - 0-N worker threads, which run the rest of the request processor pipeline on the requests. If configured with 0 worker threads, the primary commit processor thread runs the pipeline directly. Typical (default) thread counts are: on a 32 core machine, 1 commit processor thread and 32 worker threads. Multi-threading constraints: - Each session's requests must be processed in order. - Write requests must be processed in zxid order - Must ensure no race condition between writes in one session that would trigger a watch being set by a read request in another session The current implementation solves the third constraint by simply allowing no read requests to be processed in parallel with write requests.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class java.lang.Thread
java.lang.Thread.State, java.lang.Thread.UncaughtExceptionHandler
-
Nested classes/interfaces inherited from interface org.apache.zookeeper.server.RequestProcessor
RequestProcessor.RequestProcessorException
-
-
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.LinkedBlockingQueue<Request>
committedRequests
Requests that have been committed.protected java.util.concurrent.atomic.AtomicReference<Request>
nextPending
Request for which we are currently awaiting a commitprotected java.util.concurrent.atomic.AtomicInteger
numRequestsProcessing
The number of requests currently being processedprotected java.util.concurrent.LinkedBlockingQueue<Request>
queuedRequests
Requests that we are holding until the commit comes in.protected boolean
stopped
protected WorkerService
workerPool
static java.lang.String
ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS
Default: numCoresstatic java.lang.String
ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT
Default worker pool shutdown timeout in ms: 5000 (5s)
-
Constructor Summary
Constructors Constructor Description CommitProcessor(RequestProcessor nextProcessor, java.lang.String id, boolean matchSyncs, ZooKeeperServerListener listener)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
commit(Request request)
protected boolean
needCommit(Request request)
protected void
processCommitted()
void
processRequest(Request request)
void
run()
void
shutdown()
void
start()
-
Methods inherited from class org.apache.zookeeper.server.ZooKeeperCriticalThread
handleException
-
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, onSpinWait, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, stop, suspend, toString, yield
-
-
-
-
Field Detail
-
ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS
public static final java.lang.String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS
Default: numCores- See Also:
- Constant Field Values
-
ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT
public static final java.lang.String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT
Default worker pool shutdown timeout in ms: 5000 (5s)- See Also:
- Constant Field Values
-
queuedRequests
protected final java.util.concurrent.LinkedBlockingQueue<Request> queuedRequests
Requests that we are holding until the commit comes in.
-
committedRequests
protected final java.util.concurrent.LinkedBlockingQueue<Request> committedRequests
Requests that have been committed.
-
nextPending
protected final java.util.concurrent.atomic.AtomicReference<Request> nextPending
Request for which we are currently awaiting a commit
-
numRequestsProcessing
protected java.util.concurrent.atomic.AtomicInteger numRequestsProcessing
The number of requests currently being processed
-
stopped
protected volatile boolean stopped
-
workerPool
protected WorkerService workerPool
-
-
Constructor Detail
-
CommitProcessor
public CommitProcessor(RequestProcessor nextProcessor, java.lang.String id, boolean matchSyncs, ZooKeeperServerListener listener)
-
-
Method Detail
-
needCommit
protected boolean needCommit(Request request)
-
run
public void run()
- Specified by:
run
in interfacejava.lang.Runnable
- Overrides:
run
in classjava.lang.Thread
-
processCommitted
protected void processCommitted()
-
start
public void start()
- Overrides:
start
in classjava.lang.Thread
-
commit
public void commit(Request request)
-
processRequest
public void processRequest(Request request)
- Specified by:
processRequest
in interfaceRequestProcessor
-
shutdown
public void shutdown()
- Specified by:
shutdown
in interfaceRequestProcessor
-
-