More Scala resources
Coroutines
I tend to think of a coroutine as a communicating process. A producer is a coroutine that emits values. A consumer is a coroutine that receives values. A transducer is a coroutine that both emits and receives. It's easy to represent coroutines as threads in Scala:
trait Coroutine extends Runnable { def start () { val myThread = new Thread(this) ; myThread.start() ; } }
(It's also possible to use continuations, which Scala now has, to build coroutines.)
A simple (only mildly inefficient) implementation of consumers,
producers and transducers uses thread-safe queues to buffer inputs and
outputs.
Producers call put()
inside an infinite loop in their run()
method when they have a value to emit:
/**
A O-producer is a coroutine that produces
type-O objects for consumption by other
coroutines.
*/
trait Producer[O] extends Coroutine {
private val outputs =
new ArrayBlockingQueue[O] (1024) ;
/**
Called by the coroutine's run
method when it has produced a new output.
*/
protected def put (output : O) {
outputs.put(output) ;
}
/**
Called by an upstream consumer when it
wants a new value from this coroutine.
*/
def next () : O = {
outputs.take() ;
}
/**
Composes this producing coroutine
with a transducing coroutine.
@return A fused producing coroutine.
*/
def ==>[O2] (t : Transducer[O,O2])
: Producer[O2] = {
val that = this ;
new Producer[O2] {
def run () {
while (true) {
val o = that.next() ;
t.accept(o);
put(t.next()) ;
}
}
override def start () {
that.start() ;
t.start() ;
super.start() ;
}
}
}
/**
Composes this producing coroutine
with a consuming coroutine.
@return A fused coroutine.
*/
def ==> (consumer : Consumer[O])
: Coroutine = {
val that = this ;
new Coroutine {
def run {
while (true) {
val o = that.next() ;
consumer.accept(o) ;
}
}
override def start {
that.start() ;
consumer.start() ;
super.start() ;
}
}
}
}
Consumers call get()
:
/** An I-consumer is a coroutine that consumes type-I objects. */ trait Consumer[I] extends Coroutine { private val inputs = new ArrayBlockingQueue[I] (1024) ; /** Called when an external I-producer has a new input to provide. */ def accept (input : I) { inputs.put(input) ; } /** Called by the coroutine itself when it needs the next input. */ protected def get () : I = { inputs.take() ; } }
Transducers are literally both a consumer and a producer:
/** An I,O-transducer consumes type-I objects and produces type-O objects. */ trait Transducer[I,O] extends Consumer[I] with Producer[O]
One magic line
Using coroutines, the punchline in the web server becomes a single line:
listen ==> connect ==> process ==> reply
where
listen
is a producer that listens on a port and emits connected sockets;connect
is a transducer that receives newly connected sockets, wraps them into a connection and then extracts HTTP requests (using non-blocking I/O);process
is a transducer that consumes HTTP requests and produces replies; andreply
is a consumer that sends replies back (using non-blocking I/O).
The value of this one-line expression is a web server.
Web server source code
Download a zip with source, README and a Makefile.