Asynchronous concurrency with vert.x – Part 2
CoffeeScript
Vert.x supports JavaScript through the Rhino JavaScript engine. Although JavaScript is a decent language once you get to know it, I prefer CoffeeScript, a language that compiles to JavaScript. Luckily, vert.x has built-in support for CoffeeScript, so I can use it nearly transparently. You will only notice the JavaScript under the hood when reading stack traces, which will refer to the compiled JavaScript file.
For the examples in this blog post, the only thing you need to know a little CoffeeScript:
foo = (a, b) -> a + b
Translates to the JavaScript code
var foo = function (a, b) { return a + b; // (the last statement is returned) }
Also parentheses around function arguments are optional
foo a, b, c # same as foo(a, b, c)
The translated source code from the example described in the last post is
vertx = require 'vertx' address = 'example.address' handler = (message, replier) -> stdout.println "sender sent " + message replier "pong 1", (message, replier) -> # and so on vertx.eventBus.registerHandler address, handler vertx.eventBus.send address, "ping 1", (message, replier) -> stdout.println "handler sent " + message replier "ping 2", (message, replier) -> # and so on
The shorter function declaration notation is a huge improvement, especially when dealing with the kind of callback-heavy code that is prevalent when dealing with asynchronous concurrency.
The Sleeping Barber Problem
To challenge vert.x with something more exciting than ping-pong, I decided to model a basic concurrency problem that mirrors some of the challenges that our new application will face – the famous Sleeping Barber Problem:
The analogy is based upon a hypothetical barber shop with one barber. The barber has one barber chair and a waiting room with a number of chairs in it. When the barber finishes cutting a customer’s hair, he dismisses the customer and then goes to the waiting room to see if there are other customers waiting. If there are, he brings one of them back to the chair and cuts his hair. If there are no other customers waiting, he returns to his chair and sleeps in it.
Each customer, when he arrives, looks to see what the barber is doing. If the barber is sleeping, then the customer wakes him up and sits in the chair. If the barber is cutting hair, then the customer goes to the waiting room. If there is a free chair in the waiting room, the customer sits in it and waits his turn. If there is no free chair, then the customer leaves. Based on a naïve analysis, the above description should ensure that the shop functions correctly, with the barber cutting the hair of anyone who arrives until there are no more customers, and then sleeping until the next customer arrives. In practice, there are a number of problems that can occur that are illustrative of general scheduling problems.
I’ve previously solved this problem using Software Transactional Memory and was interested how the message-passing style of vert.x compares.
Barber.coffee
The barber shop problem nicely separates into two systems: a barber
message handler that keeps track of incoming customers and manages the queue, and set of callback methods representing the customer, which initiate a communication sequence with the message handler. The following code defines the barber message handler.
vertx = require 'vertx' addr = 'barber' waitTime = -> Math.random() * 100 barber = -> # the state of the message handler lives # in this closure busy = false queue = [] freeSeats = 20 # make the system a little indeterministic log = (message) -> stdout.println "barber: #{message}" # the following methods define the core behavior checkQueue = -> if queue.length > 0 serveCustomer queue.shift() freeSeats += 1 return true else return false serveCustomer = ({customer, replier}) -> log "serving #{customer}" busy = true replier 'serve', (message, replier) -> vertx.setTimer waitTime(), -> log "done serving #{customer}" busy = checkQueue() replier 'done' # this is the handler's callback method that # is being returned by the barber function (message, replier) -> customer = message if busy # there is an intermediate state where we know that we # have to queue the customer because there aren't any # free seats, but the customer must first acknowledge # the waiting state before we can actually put him in # the queue. if freeSeats > 0 freeSeats -= 1 log "sending #{customer} to queue" replier 'busy', (message, replier) -> # customer waiting ack queue.push {customer, replier} log "queued #{customer} - " + "length: #{queue.length} - free seats: #{freeSeats}" else replier 'full' else serveCustomer {customer, replier} exports.start = -> vertx.eventBus.registerHandler addr, barber()
The state of the barber is encoded by the callback method that will be called for an upcoming event and the values of the variables defined in the closure. By being able to store repliers you can easily trigger remote state changes atomically, when they should occur.
Customer.coffee
Let’s define the behavior of the customer in a separate file
vertx = require 'vertx' addr = 'barber' waitTime = -> Math.random() * 100 sendCustomer = (i) -> # As with the barber, the customer's state is # defined in this closure. The variables will # be modified by the callback methods that are # triggered by the message handler's replies. waiting = false beingServed = false log = (message) -> stdout.println "customer #{i}: #{message}" # just a shorthand send = (message, callback) -> vertx.eventBus.send addr, message, callback # factor out the exit method: # a customer can exit after having been served # or when the queue is full exit = (message) -> log message + " - exiting" # this method doesn't send a response # via the replier getHaircut = (message, replier) -> waiting = false beingServed = true log "being served" replier 'being-served', exit log "enters" send "customer #{i}", (message, replier) -> switch message when 'busy' waiting = true log 'waiting' replier 'waiting', getHaircut when 'serve' getHaircut message, replier when 'full' exit message # a loop that continuously sends customers # to the barber sendCustomerLoop = (i) -> sendCustomer i vertx.setTimer waitTime(), -> sendCustomerLoop i + 1 exports.start = -> sendCustomerLoop 1
barbershop.coffee
This time, we want to run both handler and sender in the same process, for easier testing.
barber = require 'barber' customer = require 'customer' barber.start() customer.start()
Running the shop
When we start the barbershop.coffee
script, we can see in the log that the shop is running as it is supposed to:
customer 1: enters barber: serving customer 1 customer 1: being served barber: done serving customer 1 customer 1: done - exiting customer 2: enters barber: serving customer 2 customer 2: being served barber: done serving customer 2 customer 2: done - exiting customer 3: enters [...]
This is what the output looks like when there is no congestion at all. By chance, these customers came in just after the previous customer was served. If we wait a little longer, we can see a customer entering while the barber is busy:
barber: serving customer 3 customer 3: being served customer 4: enters barber: sending customer 4 to queue customer 4: waiting barber: queued customer 4 - length: 1 - free seats: 19 customer 5: enters barber: sending customer 5 to queue customer 5: waiting barber: queued customer 5 - length: 2 - free seats: 18 barber: done serving customer 3 barber: serving customer 4 customer 3: done - exiting customer 4: being served
As you can see, customer 4 was added to the queue and is being served right customer 3 is done. But what happens if the queue is full? Let’s set waitTime = -> Math.random() * 80
in customer.coffee
so that there are a few more customers entering than leaving.
customer 34: enters barber: sending customer 34 to queue customer 34: waiting barber: queued customer 34 - length: 20 - free seats: 0 customer 35: enters customer 35: full - exiting
New customers are being turned away, as expected. The important thing is that there is no deadlocks and no invalid states, which can be easily checked by reading the log output. Knowing that there is just one callback method being executed at any point in time is a great help when reasoning about the program.
Conclusion
The central primitive is the construct replier(send_message, next_state)
. The replier
triggers a state transition in the remote system through send_message
and defines the local next_state
.
If you can model your system as something similar to linked state machines, this concurrency approach is easy to implement and very powerful.