Dataflow Programming

A relatively new SubScript language feature is dataflow, expressed by curly arrows as seen in the GUI controller example:

exit = exitCommand @gui: {! confirmExit !} ~~(b:Boolean)~~> while(!b)

We could also have specified a smaller version of the dataflow operator, that does not give a name and type to the flowing data item:

exit = exitCommand @gui: {! confirmExit !} ~~> while(!_)

Now the while in the right hand side has an underscore as parameter; like in Scala it denotes a default parameter, and it turns its close environment into a parameterized lambda. The left hand operand is also a lambda, so that it has its own result value.

The dataflow construct is a kind of sequential composition, a difference being that it cannot become a loop.

A similar construct lets exceptions flow. E.g. in x ~/~> y when x ends in failure (without success), x’s result is a Failure wrapper containing either an exception or null. Then y is executed with the flown exception or null as a parameter.

Such dataflow and exception flow may be combined in a ternary operator:

x ~~> y +~/~> z

This starts with x. When x has success, y is activated with x’s normal result value. When x terminates as a failure, z is executed with x’s resulting exception.

Similar variations are possible for dataflow operators with named items, so that these become analogous to a combination of match statements and exception handers, e.g.,

x ~~(b: Boolean    )~~> y1
 +~~(i: Int if i<10)~~> y2
 +~~( _            )~~> y3
 +~/~(e:IOException)~~> z1
 +~/~(e:  Exception)~~> z2
 +~/~(e:  Throwable)~~> z1


Example: A Slick 3 Query

Slick is a database query and access library for Scala. Version 3 supports a reactive approach for composing and running database queries. The following Slick code was taken from the article Reactive Streams for Asynchronous Database Access in Scala:

val q = for (c<-coffees) yield 
val a = q.result 
val f: Future[Seq[String]] = 

f.onSuccess { case s => println(s"Result: $s") }

With a SubScript dataflow operator this could simply be:

val q = for (c<-coffees) yield 

q ~~(s)~~> println(s"Result: $s")


Example: A Twitter Search Application

A simple Twitter search application contains an input text field and a result text area; when the user has changed the content of the input text field the application starts a request to the Twitter web service to get 10 tweets matching the input text. But Twitter imposes request rate limit on its API, and the client should not exceed this. Therefore after each change in the text field the application waits 200 milliseconds before sending the request to Twitter. If meanwhile the text field changes again, we will restart the wait. When the input text changes while a request had already been sent and the result was awaited, then that process is disrupted as well.

The searches may go wrong; we can (intentionally) send an empty search string, which will result in an error reply by the Twitter server. A pure Scala version for the controller would contain something like:

def bindInputCallback = {

  val fWait   = InterruptableFuture {...}
  val fSearch = InterruptableFuture {...} 

  reactions += {case _ => fWait.execute()
      .flatMap {case _ => fSearch.execute()}
    .onComplete{case Success(tweets)      => Swing.onEDT{...}
                case Failure(e:Throwable) => Swing.onEDT{...}
} } }

InterruptableFutures are a flavor of futures that can be cancelled on demand. This functionality requires a bunch of adhoc utility code in pure Scala, whereas it is supported out-of-the-box in SubScript, backed by theory.

The SubScript version has a live script for the controller, containing a loop of complete search sequences.

live    = initialize; (mainSeq/..)... 

mainSeq = anyEvent(view.searchField)
          {* Thread sleep keyTypeDelay *}
          {* searchTweets *} ~~(ts:Seq[Tweet])~~>updateView(ts)
                           +~/~(t: Throwable )~~>setErrorMsg(t) 

updateTweetsView(ts: Seq[Tweet]) = @gui: {...}
setErrorMsg     (t : Throwable ) = @gui: {...}

The slash and the two dots in mainseq/.. denote a disruptive loop that starts by activating 1 instance of mainSeq. As soon as the first atomic action therein happens (anyEvent in the search field) a next iteration of the disruptive loop is activated. Thus if a next anyEvent arrives soon enough, before the rest of the ongoing earlier mainSeq instance has terminated successfully, that ongoing instance is disrupted and a new delay starts, and a new instance of mainSeq is activated, etc. The disruptive loop ends when such a mainseq has terminated successfully.

It is also possible to use futures in the script. Suppose we have

def delay  = Future{Thread sleep keyTypeDelay}
def search = Future{searchTweets}

And suppose an implicit conversion from futures to scripts is in scope. Then we can use the futures as follows:

mainSeq = anyEvent(view.searchField)
          search ~~(ts: Seq[Tweet])~~>updateView(ts)
               +~/~(t : Throwable )~~>setErrorMsg(t)