High Availability for Mutable Shared State
Mutable shared state is the root of all evil in concurrent systems. The history of concurrent computation is a basically the story of approaches to managing mutable shared state. The thread model, which has long held the dominant position, leads to intractable complexity [1].
The actor model captures state in the behavior of an actor. An actor’s behavior may change over time, representing changing state. However, this “state” may only be observed through the actor’s responses to asynchronous messages. Concurrent actors react to the reception of asynchronous messages, processing them in non-deterministic arrival order. Each message reception may cause a change in the behavior/state of the actor, but this change will only become visible through the actor’s response to subsequent messages. In essence, message reception and processing is the atomic transactional unit of state change in an actor system.
Naive Database
The actor approach to shared mutable state can be illustrated by a naive “database” implementation. The “readers/writers problem” is to keep the database consistent and available when there are multiple concurrent requests to read/write the database.
Our database protocol consists of #read
and #write
requests. A #read
request includes a query function that computes a result based on the state of the database. A #write
request includes an update function that computes a new state for the database based on the current state. Both requests produce asynchronous replies indicating completion of the request. A #read
request sends the computed query result, and a #write
request sends the identity of the database actor.
LET database_beh(state) = \msg.[ CASE msg OF (cust, #read, query) : [ SEND query(state) TO cust ] (cust, #write, update) : [ BECOME database_beh(update(state)) SEND SELF TO cust ] END ]
Each request represents an atomic transaction on the database. Requests are serialized by the reception ordering rules of the actor model. Multiple concurrent requests are thus handled in a non-deterministic order, but no request can interfere with another. Consistency of the database is maintained across concurrent requests.
If one request is dependent on the completion of another, we must wait for the asynchronous response to be sure that the prior request has completed. Note, however, that additional requests may be processed before our next request is received, so the state of the database may not be simply the result of our last update.
What makes this database naive is that we are ignoring the processing time required to compute the results of query and update functions. No further requests can be received while these functions are being applied to the state of the database, which can take an arbitrary amount of time. In the worst case, these functions may not terminate, making the database permanently unresponsive. We can ensure consistency, but not availability.
High Availability
Our strategy to maintain availability involves separating the query and update computations from the handling of concurrent requests. The database still maintains the consistent state. New actors are created to perform query and update computations.
Ready Database
The database is “ready” when there are no #write
requests in progress. A #read
request cannot change the state of the database, but computation of the query function takes an arbitrary amount of time. Therefore, we create a reader actor that performs the query computation on the state, and eventually sends the result to the original customer.
LET database_beh(state) = \msg.[ CASE msg OF (cust, #read, query) : [ CREATE reader WITH \_.[ SEND query(state) TO cust ] SEND () TO reader ] ... END ]
All actor computation is initiated by reception of a message. The information our reader actor requires is already in scope, so we initiate processing by sending an empty message. The reader computes the query from the current state and sends the result to cust. The database is immediately available to handle further requests. The reader processes the query asynchronously. The original customer will receive an asynchronous reply when (and if) the query function completes. Processing a query does not impact the availability of the database for concurrent requests.
So what about updates? We also want to avoid having updates interfere with availability. We accomplish this by separating the computation of the new state from the update of the observable state of the database. While the new state is being computed, the current state remains consistently available. However, we also must prevent overlapping updates, so subsequent update requests are queued while an update is being computed. This effectively reduces the availability of the database for writers, relative to other writers, but not readers.
LET database_beh(state) = \msg.[ CASE msg OF ... (cust, #write, update) : [ CREATE writer WITH writer_beh(cust, update, state) SEND SELF TO writer BECOME locked_db_beh(writer, state, q-empty) ] END ] LET writer_beh(cust, update, state) = \db.[ SEND (SELF, #update, update(state), cust) TO db ]
When a #write
request is received, a new writer actor is created to asynchronously compute the new state by applying the update function to the current state. Computation in the writer is initiated by a message containing the identity of the database itself. The database becomes “locked”, since there is now a #write
in progress, with an initially empty queue of deferred writers.
Banker’s Queue
In previous articles, when we needed to defer customers, we often used a simple stack built by pairing new items with a list of items previously deferred. While this can be somewhat justified on the basis of indeterminate delays in asynchronous message passing, this inherently unfair strategy could lead to starvation of some customers. To restore fairness, we would prefer to use a queue (FIFO) rather than a stack (LIFO). We could use a Finger Tree for our queue, but that would be overly complicated since all we need is simple queue semantics. Instead, we will use a persistent functional data structure called a Banker’s Queue, which offers amortized O(1) performance [2].
LET push-pop(s', s) = ( CASE s OF NIL : s' (x, xs) : push-pop((x, s'), xs) END ) LET reverse(s) = push-pop(NIL, s) LET q-empty = (NIL, NIL) LET q-norm(p, q) = ( CASE p OF NIL : (reverse(q), NIL) _ : (p, q) END ) LET q-put((p, q), x) = q-norm(p, (x, q)) LET q-take(p, q) = ( CASE p OF NIL : (?) (h, t) : (h, q-norm(t, q)) END )
A Banker’s Queue maintains a pair of stacks. The “front” stack p contains elements ready to be taken from the queue, from first to last. The “back” stack q contains elements put on the queue, from last to first. The queue is normalized by ensuring that the “front” stack is only empty when the queue is empty. If the “front” stack is empty the normalization function q-norm transfers items from the “back” stack to the “front” stack, reversing their order in the process. Note that a successful call to the q-take function returns a pair consisting of the element taken, and the new state of the queue. Once again, there is no mutable state involved. All values are immutable. The q-put and q-take functions generate new values for each queue state.
Locked Database
The database is “locked” when there is a #write
request in progress. The database remembers the identity of the writer that is computing an update. While an updated state is being computed, concurrent #read
requests are still satisfied using the current state of the database. The strategy for handling #read
requests is the same as we used while “ready”.
LET locked_db_beh(writer, state, waiting) = \msg.[ CASE msg OF (cust, #read, query) : [ CREATE reader WITH \_.[ SEND query(state) TO cust ] SEND () TO reader ] ... END ]
If additional #write
requests arrive while an update is in progress, they are put on the back of the waiting queue. The database immediately becomes available to process more requests.
LET locked_db_beh(writer, state, waiting) = \msg.[ CASE msg OF ... (cust, #write, update) : [ BECOME locked_db_beh(writer, state, q-put(waiting, (cust, update))) ] ... END ]
While “locked”, the database is prepared to receive an #update
request from the writer that is computing the new state. The writer was created by the database, so the database is the only actor (besides the writer) that knows the identity of the writer. Thus the writer identity serves as a security token which only the writer can provide.
LET locked_db_beh(writer, state, waiting) = \msg.[ CASE msg OF ... ($writer, #update, state', cust) : [ CASE waiting OF $q-empty : [ BECOME database_beh(state') ] _ : [ LET ((cust', update'), waiting') = $q-take(waiting) CREATE writer' WITH writer_beh(cust', update', state') SEND SELF TO writer' BECOME locked_db_beh(writer', state', waiting') ] END SEND SELF TO cust ] END ]
When the writer has computed a new state’, it sends an #update
message tagged with its own identity, and includes the customer cust of the original #write
request. If the waiting queue is empty, the database simply becomes “ready” with the new state’ value. If the waiting queue is not empty, a waiting customer cust’ and update’ function are taken from the front of the queue. A new writer’ is created to compute the next update and the identity of the database is sent to initiate processing. The database remains “locked”, but the writer’ in progress, the state’ and any remaining waiting’ requests are updated. In any case, the database identity is sent to the original customer cust as a signal that the #write
has completed.
Test Fixture
With these behaviors in place, we can create a database with a simple integer state and send it a few concurrent requests. We use the built-in timer actor to arrange for delayed delivery of some requests, presuming that previous outstanding requests will all have completed by the time the delayed messages are delivered.
CREATE db WITH database_beh(0) SEND (println, #write, \x.add(x, 1)) TO db SEND (println, #read, \x.(-1, x)) TO db SEND (println, #read, \x.(-2, x)) TO db SEND (println, #read, \x.(-3, x)) TO db SEND (1000, (println, #write, \x.add(x, 2)), db) TO timer SEND (1000, (println, #read, \x.(-4, x)), db) TO timer SEND (1000, (println, #write, \x.add(x, 4)), db) TO timer SEND (2000, (println, #read, \x.(-5, x)), db) TO timer
There is considerable opportunity for non-determinism in this example. The database is created with an integer state of zero (0
). Three batches of concurrent requests are sent one second apart.
The first batch includes a #write
request and three #read
requests. The #write
request computes an updated state by adding one (1
) to the current state. The #read
requests each label the current state with a distinct negative-numbered prefix (so we can identify the requests). Some of the #read
requests may get the initial value (0
) and some may get the updated value (1
), depending on non-deterministic arrival ordering of the requests.
The second batch includes two #write
requests and a #read
request. The #write
requests add two (2
) and four (4
) respectively to the current state. Again, depending on the non-deterministic arrival order of the requests, the #read
request may observe the value 1
, 3
, 5
or 7
.
The third batch includes only a #read
request, which will observe the final value of 7
.
Conclusion
Shared mutable state can be represented by the behavior of an actor. The transactional semantics of actor message reception and processing ensure consistency. Availability can be enhanced by off-loading computation to dynamically-created subordinate actors. Fairness of serialized updates is provided by a queue for deferred requests. This addresses two of the three pillars of Brewer’s CAP Theorem [3]. Partition tolerance is enhanced by the encapsulation of query and update computations as part of #read
and #write
requests. Customers can become partitioned from the database without affecting the availability of the database to non-partitioned customers.
References
- [1]
- E. Lee. The Problem with Threads, Computer, v.39 n.5, p.33-42, May 2006.
- [2]
- C. Okasaki. Purely Functional Data Structures. Cambridge University Press, 1998.
- [3]
- N. Lynch, S. Gilbert. Brewer’s conjecture and the feasibility of consistent, available, partition-tolerant web services. ACM SIGACT News, Volume 33 Issue 2. 2002.
Tags: actor, asynchronous, availability, blocking, consistency, queue, scalability, serializer, starvation
[…] of extensibility in our target language, we will re-implement the Banker’s Queue datatype shown previously. This implementation makes use of some Kernel features not available in traditional LISP/Scheme […]
[…] previously described, a Banker’s Queue maintains a pair of stacks. The “front” stack contains elements […]
[…] strategy behind this implementation was explained in a previous post, so here we will just describe the API provided by this […]