A pipelined, non-blocking, extensible web server in under 1400 lines of Scala

[article index] [] [@mattmight] [rss]

Web servers are easy to write, but hard to get right. Before I became a professor, I ended up writing a few web servers to circumvent scaling issues with Apache and PHP. I say "a few" because the first two I wrote were so awful that they had to be scrapped.

I had a chat with Greg Morrisett a few summers ago about what a "correct" (or "secure") web server should do, and it didn't take long to realize that even defining correct is a challenge.

As just one (important) example, a correct web server can't use blocking I/O, ever. If you make a blocking call, you've created a denial-of-service vulnerability. Writing a web server that uses blocking I/O calls isn't hard. But you can't take a web server with blocking I/O and then drop in non-blocking I/O; it requires a fundamentally more complex design.

In my research, I claim that constructs like coroutines make it easier to design systems software (like web servers), that they expose parallelism and that their inefficiencies can be optimized away by a compiler. Ironically, while I've implemented the coroutine optimizers, I'd never actually built a system out of coroutines.

Last night, I decided to test the coroutines-make-it-easier hypothesis by writing a web server that uses non-blocking I/O. (This also gives me a benchmark on which to test my optimizations.) This informal experiment lends some credibility to my claim, since I was actually able to build a reasonable web server in just one night, and the architecture is still extensible and clean. I don't feel the need to rewrite it.

Read on for the details and the code.

More Scala resources

Coroutines

I tend to think of a coroutine as a communicating process. A producer is a coroutine that emits values. A consumer is a coroutine that receives values. A transducer is a coroutine that both emits and receives. It's easy to represent coroutines as threads in Scala:

trait Coroutine extends Runnable {
  def start () {
    val myThread = new Thread(this) ;
    myThread.start() ;
  }
}

(It's also possible to use continuations, which Scala now has, to build coroutines.)

A simple (only mildly inefficient) implementation of consumers, producers and transducers uses thread-safe queues to buffer inputs and outputs. Producers call put() inside an infinite loop in their run() method when they have a value to emit:

/**
 A O-producer is a coroutine that produces
 type-O objects for consumption by other
 coroutines.
 */
trait Producer[O] extends Coroutine {
  private val outputs =
    new ArrayBlockingQueue[O] (1024) ;

  /**
   Called by the coroutine's run
   method when it has produced a new output.
   */
  protected def put (output : O) {
    outputs.put(output) ;
  }

  /**
   Called by an upstream consumer when it
   wants a new value from this coroutine.
   */
  def next () : O = {
    outputs.take() ;
  }

  /**
   Composes this producing coroutine
   with a transducing coroutine.

   @return A fused producing coroutine.
   */
  def ==>[O2] (t : Transducer[O,O2])
                  : Producer[O2] = {
    val that = this ;
    new Producer[O2] {
      def run () {
        while (true) {
          val o = that.next() ;
          t.accept(o);
          put(t.next()) ;
        }
      }
      
      override def start () {
        that.start() ;
        t.start() ;
        super.start() ;
      }
    }
  }

  /**
   Composes this producing coroutine
   with a consuming coroutine.

   @return A fused coroutine.
   */
  def ==> (consumer : Consumer[O])
             : Coroutine = {
    val that = this ;
    new Coroutine {
      def run {
        while (true) {
          val o = that.next() ;
          consumer.accept(o) ;
        }
      }

      override def start {
        that.start() ;
        consumer.start() ;
        super.start() ;
      }
    }
  }
}

Consumers call get():

/**
 An I-consumer is a coroutine that consumes
 type-I objects.
 */
trait Consumer[I] extends Coroutine {
  private val inputs =
    new ArrayBlockingQueue[I] (1024) ;

  /**
   Called when an external I-producer
   has a new input to provide.
   */
  def accept (input : I) {
    inputs.put(input) ;
  }

  /**
   Called by the coroutine itself
   when it needs the next input.
   */
  protected def get () : I = {
    inputs.take() ;
  }
}

Transducers are literally both a consumer and a producer:

/**
 An I,O-transducer consumes
 type-I objects and produces
 type-O objects.
 */
trait Transducer[I,O]
 extends Consumer[I] with Producer[O] 

One magic line

Using coroutines, the punchline in the web server becomes a single line:

 listen ==> connect ==> process ==> reply

where

  • listen is a producer that listens on a port and emits connected sockets;
  • connect is a transducer that receives newly connected sockets, wraps them into a connection and then extracts HTTP requests (using non-blocking I/O);
  • process is a transducer that consumes HTTP requests and produces replies; and
  • reply is a consumer that sends replies back (using non-blocking I/O).

The value of this one-line expression is a web server.

Web server source code

Download a zip with source, README and a Makefile.