SubScript Actors

Introduction

Programming the control flow of actors in Java or Scala is relatively hard, just like with GUIs. In both cases events arrive as calls to listeners; these listeners then perform some actions; they may change a state and change the set of interesting events. After this call-back-call the next one will occur at some point.

With SubScript the control flow may be inverted; scripts treat events and internal actions in an equal way. State is largely maintained by these scripts. This page describes how SubScript does this on top of Akka actors. Under the hood about the same things happen as in plain Scala versions of Akka actors: there are still partial functions listening to incoming data. This way the full power of the Akka framework remains available.

Example 1: Akka’s ExampleActor

The ExampleActor from the Akka documentation lists the following (some parts deleted):

class ExampleActor extends Actor {
  def receive = {
    case Request(r)               => sender ! calculate(r)
    case Shutdown                 => context.stop(self)
    case Dangerous(r)             => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender)
    case OtherJob(r)              => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender)
    case JobReply(result, orig_s) => orig_s ! result
  }
}

A bit hidden in this specification is the fact that processing stops when a Shutdown message arrives.

A SubScript variant would not have the “receive” callback; instead there will be a “live” script  like in SubScript GUI controllers:

class ExampleSubScriptActor extends SubScriptActor {
  script..
    live = .. <<
      case Request(r)               => {sender ! calculate(r)}
      case Dangerous(r)             => {context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender)}
      case OtherJob(r)              => {context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender)}
      case JobReply(result, orig_s) => {orig_s ! result}
    >> ;
    << Shutdown>>
}

This live script is willing to receive and process zero or more messages until a ShutDown message.  After the live script ends context.stop will be called, somewhere under the hood.

A <<>> section contains a so called “partial script”. This is much like a Scala partial function; the main difference being that the refinement bodies are scripts.

To the right of the arrows the value sender denotes the actor that sent the value specified at the left hand side of the arrow. This is a bit different from the sender in class Actor, which denotes the sender of the last received message. That definition could be misleading in SubScript actors when multiple <<>> sections are concurrently active.

Two rules enable the concise notation for the input message <<Shutdown>>:

  • if there is only 1 case then the case keyword may be omitted
  • if there is nothing to do in the body this may be left out together with the arrow

Example 2: A Finite State Machine

In the first example the SubScript version hardly improves on the Scala version. SubScript becomes useful when the state of an actor is more important. The Akka documentation contains an example of the support for Finite State Machines (FSM):

Consider an actor which shall receive and queue messages while they arrive in a burst and send them on after the burst ended or a flush request is received.

The actor receives the following messages:

case class SetTarget(ref: ActorRef)
case class Queue(obj: Any)
case object Flush

The SetTarget message should be the first received message. As soon as the first Queue message arrives a timeout period starts; on this timeout, or earlier when a Flush message arrives, the queue is flushed, by sending the following message to the target actor.

case class Batch(obj: Seq[Any])

For unexpected incoming messages (e.g. an extra SetTarget message) warnings should be logged.

The plain Scala solution from the Akka documentation is:

sealed trait State

case object Idle   extends State
case object Active extends State

sealed trait Data
case object Uninitialized extends Data
case class  Todo(target: ActorRef, queue: Seq[Any]) extends Data

class Buncher extends Actor with FSM[State,Data]
{
  startWith(Idle, Uninitialized)

  when(Idle) {
    case Event(SetTarget(ref), Uninitialized) => stay using Todo(ref, Vector.empty)}

  onTransition {
    case Active -> Idle => stateData match {case Todo(ref,q) => ref ! Batch(q)} }

  when(Active, stateTimeout = 1 second) {
    case Event(Flush | StateTimeout, t: Todo) => goto(Idle) using t.copy(queue = Vector.empty)}

  whenUnhandled {// common code for both states
    case Event(Queue(obj), t @ Todo(_, v)) => goto(Active) using t.copy(queue=v:+obj)

    case Event(e, s) => log.warning("unhandled request {} in state {}/{}", e, stateName, s)
                        stay
  }
  initialize
}

The SubScript version is shorter:

class SubScriptBuncher extends SubScriptActor {
val timer = new ScriptTimer
var target: ActorRef = _
  var q: Seq[Any] = _ // q shorthand for queue

 script..

  live = <<SetTarget(ref) => {target=ref; q=Vector.empty}>>
         ( ..
           <<Queue(obj) => {q+=obj}>>
           if (pass==0) timer.start
         ; <<Flush>> + timer.timeout(1 second)
         ; {target!Batch(q); q=Vector.empty; timer.stop}}
         ; ...
         )

  def unhandled = {case e=>log.warning("received unhandled request {}", e)}
}

Here timer is an object with a start method and a script that handles a timeout event after a given time has passed by.

pass is a loop counter; the condition pass==0 makes sure the timer gets started only once in each burst.

Note that the logged warning message does not give state information, unlike the plain Scala solution.

The timeout script is predefined in SubScriptActor. It intends to perform an atomic action after a given duration has expired since the call to the script; the script call may be deactivated earlier, e.g., in the above example when a Flush message comes earlier. The code is rather complicated, just like the event handling GUI scripts in subscript.swing.Scripts:

timeOut(d: Duration) = @{val executor = new EventHandlingCodeFragmentExecutor(there, there.scriptExecutor)
                         val cancellable = system.scheduler.scheduleOnce(d) {executor.executeMatching(true)}
                         there.onDeactivate(cancellable.cancel)}:
                        {. .}

Example 3: Parallel Computation

An actor typically performs a task upon request and sends the results back, or to another actor. To exploit parallelism for quick calculation, the actor may split the received task and delegate the parts to newly created other actors. After all delegates have returned their results, the aggregate result is available and the delegating actor is ready to send it on.

For example, ParallelPolynomialIntegral is a GitHub project actor system for parallel computing polynomial integrals. One of the actors is a dispatcher; it creates “subintervals” actors that perform the calculation; the dispatcher also aggregates the results. A counter keeps track of the number of active calculating actors; when this reaches zero, the result aggregation is done. The original code is here at GitHub. With some details left out, the code is:

def receive = {
 case computationContext : ComputationContext =>

   //reset the instance variables
   areaSum = 0
   activeActors = 0
   clientActor = …
   val subintervals : Int = …
   val subintervalStep = …

   //Starts a PolynomialIntegralCalculatorActor for each interval
   var lastX = …
   for(i <- 0 until subintervals) {
     val actor = actorOf[AreaUnderTheCurveComputerActor].start
     val toX = lastX + subintervalStep
     actor ! PolynomialInterval (Interval(lastX, toX), computationContext.coefList)
     activeActors += 1
     lastX = toX
   }
   initializationFinished = true

 //Result received for the computation for an interval
 case result : Double =>
   areaSum += result
   self.sender.get.stop
   activeActors -= 1

   //when all actors have sent their computation, sends the sum to the client
   if(initializationFinished && activeActors <= 0) {
     clientActor ! areaSum
   }
}

The SubScript version does not require keeping track of the state as number of active actors, neither whether initialization has finished. Also areaSum and clientActor may now be a local variable and a local value, rather than an instance variable and an instance value:

live = ...
       << computationContext:ComputationContext
       => var areaSum: Double = 0
          val clientActor = …
          val subintervals : Int = …
          val subintervalStep = …
          var lastX = …
          ( times(subintervals)
          & {! val actor = actorOf[AreaUnderTheCurveComputerActor].start
               val toX = lastX + subintervalStep
               actor ! PolynomialInterval(Interval(lastX, toX), computationContext.coefList))
               lastX = toX !}
            << d:Double => {areaSum += d; sender.get.stop} >>
          )
          {clientActor ! areaSum}
       >>

TBD

The above has at the time of writing not yet been implemented. Maybe it would be good to change the syntax a bit. Many cases of the partial script inside <<>> will have a single code fragment on the right hand side of the arrow =>. Maybe it is better to make such a code fragment the default case, so that no braces would be needed. When there would be a more general script expression than just a single code fragment, then a bigger arrow would be applied.

This way the first example would lose 4 brace pairs:

class ExampleSubScriptActor extends SubScriptActor {
  script..
    live = .. <<
      case Request(r)               => sender ! calculate(r)
      case Dangerous(r)             => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender)
      case OtherJob(r)              => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender)
      case JobReply(result, orig_s) => orig_s ! result
    >> ;
    << Shutdown>>
}

The second example would change likewise. The third example would lose a brace pair, and an arrow is now larger:

live = ...
       << computationContext:ComputationContext
      ==> var areaSum: Double = 0
          val clientActor = …
          val subintervals : Int = …
          val subintervalStep = …
          var lastX = …
          ( times(subintervals)
          & {! val actor = actorOf[AreaUnderTheCurveComputerActor].start
               val toX = lastX + subintervalStep
               actor ! PolynomialInterval(Interval(lastX, toX), computationContext.coefList))
               lastX = toX !}
            << d:Double => areaSum += d; sender.get.stop >>
          )
          {clientActor ! areaSum}
       >>
0 comments