2009-03-07

Simple Future

A future is a computation that encapsulates the notion of asynchronously computing a value. Let me deconstruct the buzzwords:

  • By "computation" I mean a mechanism that, eventually, reduces to or contains a value
  • By "value" I mean a primitive object that cannot be reduced further, or rather a computation that is trivial in the sense that it returns itself when run
  • By "asynchronous" I mean that the context of the computation doesn't have to wait for it to complete before continuing

By applying Moggi's insight, you can view futures as a monad 'a future encapsulating values of type 'a. There are two important operations distinguishing futures from any other monad: the first attempts to "redeem" or unwrap the encapsulated value, blocking if its computation didn't yet finish:

val redeem : 'a future -> 'a

The second constructs a future from a function by running it asynchronously in a separate thread:

val delay : ('a -> 'b) -> ('a -> 'b future)

(I use extra parentheses to emphasize the functorial aspects of computation). It is important that futures be composable, in the sense that there must exist an operation that combines a list of futures into a single future:

val select : 'a future list -> 'a t

with the strong condition that the returned future is the first in the list to complete, so that it can be redeemed without waiting. This operation is sufficiently powerful to construct an efficient polling operation on a future (that is, testing in constant time whether a future is immediately redeemable or not) as a derived operation. Of course, a very inefficient selection operation can also be built based on polling, so that the two primitives are semantically inter-definable.

Given than futures form a monad, they come equipped with the usual operations:

val unit : 'a -> 'a future
val bind : ('a -> 'b future) -> ('a future -> 'b future)
val fmap : ('a -> 'b) -> ('a future -> 'b future)

(If the type of bind seems puzzling, it's that I prefer working with Kleisli triples. Again, I've parenthesized the declarations in order to highlight the categorial aspects of the operations on futures). The key point is that these operations must "run in the future" and not block the calling thread. Then, poll can be written as:

let poll f = redeem (select [
  fmap (fun _ -> true) f;
  unit false
])

The first element is a future that replaces the result of running f with true. The second future immediately returns false. If the first is already completed, select would return its true value. Since futures are immutable, f is still available for redeeming. If, on the other hand, the first is not yet completed, select would return the second future, by the strong guarantee of deterministic (left-to-right) parallelism, and would return false on the spot.

So, let's build a minimal interface for a future. First let's start with a monad:

module type MONAD = sig
  type 'a t
  val unit : 'a -> 'a t
  val bind : ('a -> 'b t) -> ('a t -> 'b t)
  val fmap : ('a -> 'b) -> ('a t -> 'b t)
end

Of course fmap is derivable in a monad because every monad is a (categorial) functor, but it is simple to include it in the implementation, and it's often more efficient to use a specialized version of it. Now, the futures themselves:

module type FUTURE = sig
  include MONAD
  val redeem : 'a t -> 'a
  val delay  : ('a -> 'b) -> ('a -> 'b t)
  val select : 'a t list -> 'a t
end

To derive poll I can use a functor:

module Future (F : FUTURE) = struct
  include F
  let poll f = redeem (select [
    fmap (fun _ -> true) f;
    unit false
  ])
end

With this specification, the minimal semantics for futures is to perform completely synchronous computations; a valid, although not very interesting starting point.

module NullFuture = Future(struct
  type 'a t = 'a
  let unit x = x
  let bind f x = f x
  let fmap f x = f x
  let redeem x = x
  let delay f x = f x
  let select = function [] -> failwith "select" | x :: _ -> x
end)

For a truly multi-threaded experience, I must call in the services of the Thread module. The problem with OCaml's threads is twofold. First, threads don't return a value, but just run a function unattended until completion, discarding the result. This is remedied easily enough by providing the thread with a reference in which to deliver its result. The other, more serious problem is that there's no built-in way to wait on more than one thread.

Both drawbacks can be overcome by associating the references with a condition variable and waiting for the first thread that signals it, in effect creating a synchronizing slot. I follow mostly Bartosz Milewsky's description of MVars, but I depart of his implementation in that slots are more restricted. First, the interface is simple enough:

module Slot : sig
  type 'a t
  val create : unit -> 'a t
  val put : 'a -> 'a t -> unit
  val get : 'a t -> 'a
end = struct

The idea is that get waits for a value to be present, but put has an effect only if the slot is empty. To wait on a value, I use a Condition.t that must always be paired with a Mutex.t:

  type 'a t = { m : Mutex.t; c : Condition.t; mutable ref : 'a option }

Creating a slot is simple enough:

  let create () = { m = Mutex.create (); c = Condition.create (); ref = None }

In order to ensure that all accesses to a slot are synchronous, and that the lock is properly released even in the event of failure, I use a wrapper:

  let locked f s =
    Mutex.lock s.m;
    try let x = f s in Mutex.unlock s.m; x
    with e -> Mutex.unlock s.m; raise e

Putting a value in the slot succeeds only if the slot is empty; otherwise, it has no effect:

  let put x =
    locked (fun s ->
      if s.ref = None then begin
        s.ref <- Some x; Condition.signal s.c
      end)

Getting from an empty slot blocks until another thread puts a value into it; there is no wait involved when it is full:

  let get s =
    let rec go s = match s.ref with
    | None   -> Condition.wait s.c s.m; go s
    | Some x -> x
    in locked go s
end

The net effect is that a slot becomes immutable once set. This is key to ensure that select has fair semantics and to avoid race conditions.

Now futures must wait on the slot to be filled with the result of the computation, which must proceed in a separate thread if it is to be truly asynchronous:

module AsyncFuture = Future(struct
  type 'a t = { t : Thread.t; s : 'a Slot.t }

Redeeming a future is simply waiting on its slot to become full:

  let redeem m = Slot.get m.s

In order to delay a computation by turning it into a future, spawn a thread that runs it and deposits its value in the corresponding slot:

  let delay f x =
    let s = Slot.create () in
    { t = Thread.create (fun x -> Slot.put (f x) s) x; s = s }

That's it. Now selecting from a set of futures is a bit more involved. Of course, it makes no sense to treat an empty list of futures as anything else but an error. Otherwise, I turn every future into a delayed computation that runs it to completion and signals termination by setting a slot with a reference to itself. The semantics for slots ensure that the first future to complete will be the one that will be present as the slot's value:

  let select = function
  | [] -> failwith "select"
  | l  ->
    let s = Slot.create () in
    List.iter (fun m ->
      ignore (Thread.create (fun m -> ignore (redeem m); Slot.put m s) m))
      l;
    Slot.get s

As a slight optimization I directly create Threads instead of delaying the thunks, because the results are completely uninteresting.

Finally, the monadic semantics is very simple to provide with these primitives:

  let unit x =
    let s = Slot.create () in Slot.put x s;
    { t = Thread.self; s = s }

  let bind f m = delay (fun m -> redeem (f (redeem m))) m
  let fmap f m = delay (fun m -> f (redeem m)) m
end)

I had originally written unit as delay id; but since the value is already present and it barely needs a slot, much less a thread, I placehold the future with the current thread. If the slot were to be waited upon, this could lead to deadlock; as it is already full, there is no danger of the current thread becoming blocked because of it.

This implementation is low-level enough that can be used as a guide for implementing composable futures in other languages. In fact, Apocalisp has a series of articles presenting this very design applied to Java. Of course, this is far more basic than that, in that there is no provision for a strategy for running threads, which are in fact paired one-to-one with thunks. Building a work queue, a thread pool to run it and layering a future library on that is left as an exercise for the reader.

No comments: