13

Fork-Join Parallelism with Futures


13.1 Parallel Computation using Futures239
13.2 N-Ways Parallelism242
13.3 Parallel Web Crawling245
13.4 Asynchronous Futures250
13.5 Asynchronous Web Crawling254

def fetchAllLinksParallel(startTitle: String, depth: Int): Set[String] =
  var seen = Set(startTitle)
  var current = Set(startTitle)
  for i <- Range(0, depth) do
    val futures = for title <- current yield Future{ fetchLinks(title) }
    val nextTitleLists = futures.map(Await.result(_, Inf))
    current = nextTitleLists.flatten.filter(!seen.contains(_))
    seen = seen ++ current
  seen
13.1.scala

Snippet 13.1: a simple parallel web-crawler implemented using Scala Futures

The Scala programming language comes with a Futures API. Futures make parallel and asynchronous programming much easier to handle than working with traditional techniques of threads, locks, and callbacks.

This chapter dives into Scala's Futures: how to use them, how they work, and how you can use them to parallelize data processing workflows. It culminates in using Futures together with the techniques we learned in Chapter 12: Working with HTTP APIs to write a high-performance concurrent web crawler in a straightforward and intuitive way.

For this chapter, we will use the Scala CLI REPL:

> import scala.concurrent.*, duration.Duration.Inf, java.util.concurrent.Executors

> given ec: ExecutionContext =
    ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
13.2.scala

We import the common classes and functions to let us work with Futures from the scala.concurrent package. We also import the duration scala.concurrent.duration.Inf, and construct an ExecutionContext using a newFixedThreadPool with 8 threads as a reasonable default to get started with. You can control the level of parallelism by changing the number of threads. There are also other kinds of ExecutionContext available for more advanced use cases.

Note that in a larger application, ExecutionContexts created from thread pools need to be explicitly terminated using .shutdown() when no longer needed (e.g. in a finally block at the end of the method in which they are used, or at the end of the main method if they are global). Thread pools are somewhat expensive to instantiate, so where possible you typically want to share them throughout your program rather than re-instantiating them everywhere they are needed.

13.1 Parallel Computation using Futures

As an example computation we can parallelize using Futures, we will be using an implementation of the BCrypt hash function from the following repository. We can load this into our Scala REPL and define a def hash function that we can use to hash files.

$ ./mill --import "at.favre.lib:bcrypt:0.9.0" --repl

> import at.favre.lib.crypto.bcrypt.{BCrypt, LongPasswordStrategies}

> val hasher = BCrypt.`with`(
    LongPasswordStrategies.hashSha512(BCrypt.Version.VERSION_2A))

> def hash(name: String) =
    val salt = name.take(16).padTo(16, ' ').getBytes
    val bytes = hasher.hash(/*difficulty*/ 17, salt, os.read.bytes(os.pwd / name))
    String(bytes)
13.3.scala

We will be using the four following files as the data we will hash throughout this chapter:

  • Chinatown.jpg (https://github.com/handsonscala/handsonscala/tree/v2/resources/13)
  • Kresge.jpg (https://github.com/handsonscala/handsonscala/tree/v2/resources/13)
  • Memorial.jpg (https://github.com/handsonscala/handsonscala/tree/v2/resources/13)
  • ZCenter.jpg (https://github.com/handsonscala/handsonscala/tree/v2/resources/13)

With the difficulty level of hashing set to 17, hashing one of these files takes a significant amount of time to complete. This is clear if we define a timer helper function and then run hash on e.g. Chinatown.jpg:

> def time[T](op: => T) =
    val start = System.nanoTime
    val value = op
    val taken = System.nanoTime - start
    (value, duration.FiniteDuration(taken, "nanos"))

> time{ hash("Chinatown.jpg") }
res0: (String, scala.concurrent.duration.FiniteDuration) = (
  "$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefgcTVP7sc2rYBdPb2",
  10942495521 nanoseconds
)
13.4.scala

Using the time{} function, we can see that this takes about 10 seconds to complete, depending on the hardware you are running on.

13.1.1 Sequential Code

Consider the following code that executes two call to hash sequentially. We can see that this takes about 20 seconds to run:

> time{
    println(hash("Chinatown.jpg"))
    println(hash("ZCenter.jpg"))
  }
$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefgcTVP7sc2rYBdPb2
$2a$17$UiLjZlPjag3oaEaeGA.eG.8KLuk3HS0iqPGaRJPdp1Bjl4zjhQLWi
res1: (Unit, scala.concurrent.duration.FiniteDuration) = ((), 20273416425 nanoseconds)
13.5.scala

Typical Scala code, like most programming languages, executes sequentially: line by line from top to bottom, left to right within each line. If we can run both hash calls in parallel, we should be able to do it in half the time: about 10 seconds. That is where Futures come in.

13.1.2 Spawning Futures

The basic building block is scala.concurrent.Future, which we have imported as simply Future. Futures define lightweight tasks that run on a thread pool, perform some computation, and return a value. We construct a simple thread pool earlier (Executors.newFixedThreadPool(8)), but there a range of options you can choose from (newWorkStealingPool, newSingleThreadExecutor, etc.).

To create a future, use the Future{ ... } syntax:

> val f1 = Future{ "hello" + 123 + "world" }
f1: scala.concurrent.Future[String] = Future(Success(hello123world))

> val f2 = Future{ hash("Chinatown.jpg") }
f2: scala.concurrent.Future[String] = Future(<not completed>)
13.6.scala

We can see that the first Future f1 is already completed when printed out. That makes sense, as the computation is simple and would complete almost instantly, even before the result is printed to the console. The second Future f2 uses the hash function we defined earlier, and therefore takes longer and is listed as <not completed>. However, both lines val f1 = and val f2 = complete instantly! This is because when we create a Future, it runs in the background on a thread pool and we can continue doing other things while it is working.

If we wait about 10 seconds and ask for f2 again, we will see it complete and print out the hash of the file:

> f2
res2: scala.concurrent.Future[String] = Future(Success($2a$17$O0fnZkDyZ1b...))
13.7.scala

13.1.3 Offloading Work from the Main Thread

A Future's computation returns the single value at the end of the Future{ ... } block. When we want the return value of the Future, we can use Await.result to get it:

> val f = Future{ hash("Chinatown.jpg") }
f: scala.concurrent.Future[String] = Future(<not completed>)

> Await.result(f, Inf)
res3: String = "$2a$17$O0fnZkDyZ1bsJinOPw.eG.H80jYKe4v1rAF8k5sH9uRue4tma50rK"
13.8.scala

Even just creating a single Future to run in the background can be useful, as you can have your code do other things while the Future is running. For example, we can run one hash call inside the Future, do another hash call on the main thread, and then use Await.result to wait for the background call to complete:

> val f = Future{ hash("Chinatown.jpg") }
f: scala.concurrent.Future[String] = Future(<not completed>)

> val result = hash("ZCenter.jpg")
result: String = "$2a$17$UiLjZlPjag3oaEaeGA.eG.8KLuk3HS0iqPGaRJPdp1Bjl4zj..."

> val backgroundResult = Await.result(f, Inf)
backgroundResult: String = "$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefg..."
13.9.scala

We can visualize the execution of the main thread and future as follows:

futures cluster_0 Main Thread cluster_1 Thread Pool start start val f = val f = start->val f = hash(ZCenter.jpg) hash(ZCenter.jpg) val f =->hash(ZCenter.jpg) hash(Chinatown.jpg) hash(Chinatown.jpg) val f =->hash(Chinatown.jpg) Await.result Await.result hash(ZCenter.jpg)->Await.result end end Await.result->end hash(Chinatown.jpg)->Await.result

What Await.result does depends on whether the Future is completed when the Await.result is called on the main thread:

  • If the Future is already completed, then Await.result immediately returns the value computed by the Future for you to use

  • If the Future has not yet completed, then Await.result will wait until the value is ready before returning it

In both cases, you do not need to worry about using mutable variables, locking, or race conditions when transferring the result of the background Future back to the main thread: the Await.result function does that for you automatically, ensuring that your main thread will be given the backgroundResult value to use once it is ready.

13.2 N-Ways Parallelism

We have so far looked at two cases:

  1. Sequential code
  2. Running one half the work in a background Future and one half on the main thread

Let's now consider a third case: splitting a trivially-parallel computation into N parts, and running it N-ways parallel in the background.

13.2.1 N Sequential Computations

First, consider the following sequential computation, which runs our def hash function over every file in the process working directory. Below, we can see that the process overall takes about 39 seconds, running sequentially in a for-comprehension:

> val (hashes, duration) = time{
    for p <- os.list(os.pwd) yield
      println("Hashing " + p)
      hash(p.last)
  }
Hashing /Users/lihaoyi/test/Chinatown.jpg
Hashing /Users/lihaoyi/test/Kresge.jpg
Hashing /Users/lihaoyi/test/Memorial.jpg
Hashing /Users/lihaoyi/test/ZCenter.jpg
val hashes: IndexedSeq[String] = ArraySeq(
  "$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefgcTVP7sc2rYBdPb2",
  "$2a$17$Q1Hja0bjJknuXw.eGA.eG.KQ7bQl8kQbzR4sTBdT97icinfz5xh66",
  "$2a$17$RUTrZ1HnWUusYl/lGA.eG.TK2UsZfBYw6mLhDyORr659FFz2lPwZK",
  "$2a$17$UiLjZlPjag3oaEaeGA.eG.8KLuk3HS0iqPGaRJPdp1Bjl4zjhQLWi"
)
val duration: scala.concurrent.duration.FiniteDuration = 38949493436 nanoseconds
13.10.scala

13.2.2 N Parallel Computations

Because hashing one file is totally independent of hashing any other file, this computation is trivially parallelizable, and we can use Futures to utilize multiple cores to do the hashing in parallel. This involves kicking off a separate Future{...} block for every file we want to hash, and then waiting for all the results at the end:

 val (hashes, duration) = time{
-  for p <- os.list(os.pwd) yield
+  val futures = for p <- os.list(os.pwd) yield Future{
     println("Hashing " + p)
     hash(p.last)
+  }
+  futures.map(Await.result(_, Inf))
 }
13.11.scala

When we run this parallelized code snippet in the REPL, it produces the following output:

Hashing /Users/lihaoyi/test/Chinatown.jpg
Hashing /Users/lihaoyi/test/Kresge.jpg
Hashing /Users/lihaoyi/test/ZCenter.jpg
Hashing /Users/lihaoyi/test/Memorial.jpg
val hashes: IndexedSeq[String] = ArraySeq(
  "$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefgcTVP7sc2rYBdPb2",
  "$2a$17$Q1Hja0bjJknuXw.eGA.eG.KQ7bQl8kQbzR4sTBdT97icinfz5xh66",
  "$2a$17$RUTrZ1HnWUusYl/lGA.eG.TK2UsZfBYw6mLhDyORr659FFz2lPwZK",
  "$2a$17$UiLjZlPjag3oaEaeGA.eG.8KLuk3HS0iqPGaRJPdp1Bjl4zjhQLWi"
)
val duration: scala.concurrent.duration.FiniteDuration = 10292549149 nanoseconds
13.12.output-scala

While earlier we had two Futures which we Awaited on individually, now we have a whole list of Futures (val futures) that we map over to await on all of them.

Note that the order in which the independent Futures are executed is arbitrary: the four Hashing ... messages printed by our parallel hashing code are in a different order than our previous sequential execution, and may be in different orders if we re-run the parallel hashing multiple times. However, when we run futures.map(Await.result(_, Inf)), the final collection of hashes will be assembled in the same order the Futures were created regardless of which order the hashing actually happened.

We can see that the total is down from 39 seconds to 10 seconds (10,292,549,149 nanoseconds). This is expected, since the Futures running on a background threadpool utilizes all CPU cores to run in parallel:

futures cluster_0 Main Thread cluster_1 Thread Pool start start val futures = val futures = start->val futures = hash(Chinatown.jpg) hash(Chinatown.jpg) val futures =->hash(Chinatown.jpg) hash(ZCenter.jpg) hash(ZCenter.jpg) val futures =->hash(ZCenter.jpg) hash(Memorial.jpg) hash(Memorial.jpg) val futures =->hash(Memorial.jpg) hash(Kresge.jpg) hash(Kresge.jpg) val futures =->hash(Kresge.jpg) Await.result Await.result end end Await.result->end hash(Chinatown.jpg)->Await.result hash(ZCenter.jpg)->Await.result hash(Memorial.jpg)->Await.result hash(Kresge.jpg)->Await.result
See example 13.1 - Hashing

13.2.3 Futures vs Threads

Unlike Threads, using Futures is often an easy way to make use of parallelism for minimal extra effort: here a three-line code change! You do not need to deal with the error-prone locks, queues, volatiles and atomics that typically permeate multi-threaded code. These construct are still used under the hood, but the Futures framework encapsulates it so you usually do not need to work with them directly.

Futures are also much cheaper than threads, and can be used much more freely:

  • The memory overhead of a thread is typically has several megabytes, while that of a Future is less than a kilobyte

  • Switching between threads thread takes several microseconds, while switching between Futures takes a tenth of a microsecond

As a result, while you generally want to stay below 1,000 threads due to performance and memory concerns, you can easily have 100,000s or 1,000,000s of Futures without issue. In this case, even if the folder had a large number of files within it, creating one Future per file is unlikely to cause issues.

Like Threads, the order in which Futures evaluate is arbitrary, and may differ between runs of your program. However, as long as the value being evaluated by each Future does not have side effects, the non-deterministic order of evaluation will not affect the behavior or correctness of your code. You thus should ensure the computations you are parallelizing with Futures are "pure" and only return a value with no observable side effects: if not, the side effects may occur in a different order each time your code runs, resulting in confusing and non-deterministic bugs.

13.3 Parallel Web Crawling

Apart from parallelizing CPU-bound code, Futures can also be useful for parallelizing network-bound code. We will now explore how to use Futures to parallelize a common task (and a common interview exercise!): writing a parallel web crawler.

Let us assume the task is as follows:

Given the title of a page on Wikipedia (e.g. "Albert Einstein"), write a program that will fetch the set of all pages within a certain number depth of links from the root page, and do so in parallel for performance.

There are several ways of approaching this exercise, which is essentially to implement a traversal of the graph of Wikipedia pages and the links between them. You can do the traversal either breadth-first or depth-first, and then there are many ways you could implement parallelism. For now we will consider just one approach: a breadth first traversal, parallelized using Futures.

13.3.1 A Single HTTP Request

For example, consider the following code to make a HTTP request and fetch all the links on a Wikipedia page:

FetchLinks.scala
def fetchLinks(title: String): Seq[String] =
  val resp = requests.get(
    "https://en.wikipedia.org/w/api.php",
    params = Seq(
      "action" -> "query",
      "titles" -> title,
      "prop" -> "links",
      "format" -> "json"
    )
  )
  for
    page <- ujson.read(resp)("query")("pages").obj.values.toSeq
    links <- page.obj.get("links").toSeq
    link <- links.arr
  yield link("title").str13.13.scala

This makes use of the requests library to fetch the data from Wikipedia, and the ujson library to parse it, similar to what we did in Chapter 12: Working with HTTP APIs. You can add a println(resp.text()) if you would like to see the raw response from Wikipedia, but for this chapter it is enough to know that this function can be called on any Wikipedia page to return a list of pages that are linked from it:

> val links = fetchLinks("Albert Einstein")
links: Seq[String] = Seq(
  "20th Century Press Archives",
  "2dF Galaxy Redshift Survey",
  "A priori and a posteriori",
  "Aage Bohr",
  "Aarau",
  "Aargau",
  "Abba Eban",
  "Abdominal aortic aneurysm",
  "Abdus Salam",
  "Absent-minded professor"
)
13.14.scala

For simplicity, let's ignore the fact that Wikipedia only returns the first 10 links on each page by default, and requires pagination to access the full list. This fetchLinks function will be the foundation on which we build our Wikipedia crawler.

13.3.2 Sequential Crawling

First, let us write the function that does a simple breadth-first traversal of the page-link graph:

Crawler.scala//| moduleDeps: [./FetchLinks.scala]
def fetchAllLinks(startTitle: String, depth: Int): Set[String] =
  var seen = Set(startTitle)
  var current = Set(startTitle)
  for i <- Range(0, depth) do
    val nextTitleLists = for title <- current yield fetchLinks(title)
    current = nextTitleLists.flatten.filter(!seen.contains(_))
    seen = seen ++ current
  seen13.15.scala
See example 13.2 - Crawler

Unlike the breadth-first search we implemented in Chapter 6: Implementing Algorithms in Scala, we do not maintain an explicit queue of links to process, and instead simply process all depth 1 links, then all depth 2 links, etc. until we get to the depth we want. The current set keeps track of what pages belong to the current depth, and the seen set prevents us from redundantly processing a page more than once. Loading this into the REPL using ./mill -i Crawler.scala:repl, we can call fetchAllLinks to traverse the page-link graph to various depths:

> fetchAllLinks("Singapore", 2)
res4: Set[String] = HashSet(
  "1907 Vancouver anti-Asian riots",
  "14th G-15 summit",
  "1945 Yugoslavian parliamentary election",
  "1819 Singapore Treaty",
  ".ai",
...
13.16.scala

13.3.3 Parallel Crawling

Adding parallelism is as simple as performing each batch of fetches in parallel using Future{ ... }, then aggregating the results using Await.result:

-       val nextTitleLists = for title <- current yield fetchLinks(title)
+       val futures = for title <- current yield Future{ fetchLinks(title) }
+       val nextTitleLists = futures.map(Await.result(_, Inf))
13.17.scala

The final code looks like this:

Crawler.scala//| moduleDeps: [./FetchLinks.scala]
import scala.concurrent.*, duration.Duration.Inf, java.util.concurrent.Executors
import duration.*
val service = Executors.newFixedThreadPool(8)
given ec: ExecutionContext = ExecutionContext.fromExecutorService(service)
def fetchAllLinksParallel(startTitle: String, depth: Int): Set[String] =
  var seen = Set(startTitle)
  var current = Set(startTitle)
  for i <- Range(0, depth) do
    val futures = for (title <- current) yield Future{ fetchLinks(title) }
    val nextTitleLists = futures.map(Await.result(_, Inf))
    current = nextTitleLists.flatten.filter(!seen.contains(_))
    seen = seen ++ current
  seen13.18.scala

This parallelizes each batch of fetchLinks calls, waiting until all calls in that batch complete before aggregating the results to prepare the next batch. We can then use this function the same way as the non-parallel version, returning the same results:

> fetchAllLinksParallel("Singapore", 2)
res5: Set[String] = HashSet(
  "1907 Vancouver anti-Asian riots",
  "14th G-15 summit",
  "1945 Yugoslavian parliamentary election",
  "1819 Singapore Treaty",
  "2nd People's Defence Force",
...
13.19.scala

fetchAllLinksParallel executes as follows:

futures cluster_0 Main Thread cluster_1 Thread Pool start start fut1 val futures = start->fut1 depth 0 fetchLinks1 fetchLinks1 fut1->fetchLinks1 wait1 Await.result fut2 val futures = wait1->fut2 depth 1 fetchLinks2 fetchLinks2 fut2->fetchLinks2 fetchLinks3 fetchLinks3 fut2->fetchLinks3 fetchLinks4 fetchLinks4 fut2->fetchLinks4 wait2 Await.result fut3 val futures = wait2->fut3 depth 2 fetchLinks5 fetchLinks5 fut3->fetchLinks5 fetchLinks6 fetchLinks6 fut3->fetchLinks6 fetchLinks7 fetchLinks7 fut3->fetchLinks7 fetchLinks8 fetchLinks8 fut3->fetchLinks8 fetchLinks9 fetchLinks9 fut3->fetchLinks9 wait3 Await.result end end wait3->end fetchLinks1->wait1 fetchLinks2->wait2 fetchLinks3->wait2 fetchLinks4->wait2 fetchLinks5->wait3 fetchLinks6->wait3 fetchLinks7->wait3 fetchLinks8->wait3 fetchLinks9->wait3

13.3.4 Testing our Parallel Webcrawler

Comparing the output of fetchAllLinks and fetchAllLinksParallel, it is easy to verify that the output of the two functions is identical:

> fetchAllLinks("Singapore", 2) == fetchAllLinksParallel("Singapore", 2)
res6: Boolean = true

> fetchAllLinks("Singapore", 3) == fetchAllLinksParallel("Singapore", 3)
res7: Boolean = true
13.20.scala

Using the time{} function again, we can see that the parallel version completes significantly faster:

> time{fetchAllLinks("Singapore", 2)}._2
res8: scala.concurrent.duration.FiniteDuration = 4719789996 nanoseconds

> time{fetchAllLinksParallel("Singapore", 2)}._2
res9: scala.concurrent.duration.FiniteDuration = 1342978751 nanoseconds
13.21.scala
> time{fetchAllLinks("Singapore", 3)}._2
res10: scala.concurrent.duration.FiniteDuration = 31061249346 nanoseconds

> time{fetchAllLinksParallel("Singapore", 3)}._2
res11: scala.concurrent.duration.FiniteDuration = 4569134866 nanoseconds
13.22.scala

At depth 2, the parallel crawler takes 1.3s instead of 4.7s. At depth 3, the parallel crawler takes 4.6s instead of 31.0s. This is a significant speedup for just a three-line code change!

13.3.5 Thread Pool and Parallelism Limits

In this case, we are getting a maximum of about 8x parallelism because the thread pool we constructed has 8 threads.

As a rule of thumb, you generally want to keep the number of threads less than 1,000 to limit resource usage and thread contention. Here the requests.get method we use in fetchAllLinks is blocking - it needs at least one thread per open HTTP request - and thus we are limited in how many requests we can send in parallel. Higher levels of concurrency can be achieved by using Futures asynchronously, which we will cover later when we discuss Asynchronous Futures (13.4).

Another limitation may be the service you are querying, in this case Wikipedia. Most third-party services have rate limits meant to stop you from flooding them with requests, and even for those that don't you generally still want to avoid overloading their backend systems. By changing the size of the thread pool, you can control the degree of parallelism to avoid causing issues in the service you are interacting with.

As written, the amount of parallelism we can get is still sub-optimal: If one HTTP request takes a long time, it prevents the next batch from starting even if all other parallel HTTP requests have already completed. This is inherent in the batch-by-batch way we are doing parallelism. Improving fetchAllLinksParallel to better handle this case is left as an exercise to the reader, and can be done using techniques from Chapter 16: Message-based Parallelism with Actors.

13.4 Asynchronous Futures

So far we have been using Await.result to wait for a Future to complete before we can make use of its result. This makes it convenient to pass results between our parallel Futures, but requires the system thread calling Await.result to stop and wait for the Future to complete. Threads have overhead, so while 10 or 100 threads waiting around is not an issue, in high-concurrency systems having 1,000s of threads doing nothing is a significant waste of memory and other resources.

Futures can also work asynchronously: we can schedule work and interface with asynchronous callbacks without wasting a thread waiting for a result. This can greatly improve performance in high-concurrency scenarios.

There are two main ways to work with Futures asynchronously: interfacing with external callback-based APIs using Promises, and via asynchronous operators such as the map, foreach, zip, sequence, and flatMap and operations. We will cover both of them in this section.

13.4.1 Futures and Promises

So far we have only been creating Futures via Future{ ... } blocks, which schedules the code in the block to run on a background thread pool. Another way to create Futures is via scala.concurrent.Promise:

> def doThing(succeed: Boolean) =
    val p: Promise[String] = Promise[String]
    val f: Future[String] = p.future
    f.onComplete:
      case scala.util.Success(res) => println(s"Success! $res")
      case scala.util.Failure(exception) => println(s"Failure :( $exception")
    if succeed then p.success("Yay!")
    else p.failure(Exception("boom"))

> doThing(succeed = true)
Success! Yay!
res12: scala.concurrent.Promise[String] = Future(Success(Yay!))

> doThing(succeed = false)
Failure :( java.lang.Exception: boom
res13: scala.concurrent.Promise[String] = Future(Failure(java.lang.Exception: boom))
13.23.scala

A Promise is like the "write" side of a Future, while the Future is the "read" side. Completing the Promise using .success or .failure, does two things:

  • Passes the value to anyone waiting on the Future via Await.result
  • Triggers any callbacks attached to the Future via .onComplete

Note that Futures and Promises can complete either successfully with a result or fail with an exception: in the failure case Await.result simply re-throws the exception, while onComplete requires you to handle both cases.

Using Promises to trigger Futures demonstrates that a Future is not necessarily a background task running on a thread pool: it is a more general construct, an operation that may complete asynchronously some time later. A task running on a thread pool is one such operation, but any operation with some kind of "on success" callback can also be modeled using Future.

13.4.2 Interfacing with Callbacks using Promises

Futures and Promises are equivalent to callbacks. Any function that takes success and error callbacks can be mechanically converted to a function that returns a Future, and vice versa:

13.4.2.1 Converting Futures to Callbacks

def futureFunc(): Future[String] = ...

def callbackFunc(onSuccess: String => Unit, onError: Throwable => Unit): Unit =
  futureFunc().onComplete:
    case scala.util.Success(str) => onSuccess(str)
    case scala.util.Failure(ex) => onError(ex)
13.24.scala

13.4.2.2 Converting Callbacks to Futures

def futureFunc(): Future[String] =
  val p = Promise[String]
  callbackFunc(
    onSuccess = str => p.success(str),
    onError = ex => p.failure(ex)
  )
  p.future

def callbackFunc(onSuccess: String => Unit, onError: Throwable => Unit): Unit = ...
13.25.scala

While much of the Scala ecosystem uses Futures, the broader Java ecosystem has many libraries that design their asynchronous interfaces around callbacks. This inter-operability makes it straightforward to wrap any sort of asynchronous operation in a Future, allowing you to use it seamlessly from your Future-using code. We will see an example of this later, when we explore Asynchronous Web Crawling (13.5).

13.4.3 Asynchronous Operators

If we are working with Futures asynchronously, we generally want to avoid using Await.result to pass the result of one Future to another. While we can chain together Futures using callbacks, doing so is often inconvenient and error-prone. Scala's Future data type contains some useful methods that allow you to combine and transform the result of your Futures in a few common ways, making asynchronous computations both efficient and convenient.

In this section, we look at a few synchronous usages of Futures via Await.result (left) and see how we can instead write the code using asynchronous operators (right) to do the same computation but without blocking a thread waiting for results.

13.4.3.1 map, foreach

map lets you transform the result of a Future using a function, while foreach is similar to the onComplete method we saw earlier, except it only handles successful results. For example, rather than using Await.result to wait for the value of the hash and then calling length before printing it, we can instead use map to schedule the length call to happen whenever the hashing Future completes, and then foreach to schedule the println to happen after length computation completes:

sync val fut = Future{
   hash("Chinatown.jpg")
 }
-val res = Await.result(fut, Inf).length
-println(res)13.26.scala
async val fut = Future{
   hash("Chinatown.jpg")
 }
+val res = fut.map(_.length)
+res.foreach(println)13.27.scala

This allows us to perform our length computation and println action asynchronously, without blocking the main thread. Just like how the call to Future{ ... } returns immediately even if the background computation has not yet completed, the .map and .foreach calls also return immediately and free up the calling thread to do other things.

13.4.3.2 zip

sync val fut1 = Future{
   hash("Chinatown.jpg")
 }
 val fut2 = Future{ hash("ZCenter.jpg") }
-val hash1 = Await.result(fut1, Inf)
-val hash2 = Await.result(fut2, Inf)
-val joined = s"$hash1 $hash2"
-println(joined)13.28.scala
async val fut1 = Future{
   hash("Chinatown.jpg")
 }
 val fut2 = Future{ hash("ZCenter.jpg") }
+val zipped = fut1.zip(fut2)
+val joined = zipped.map:
+  (hash1, hash2) => s"$hash1 $hash2"
+joined.foreach(println)13.29.scala

The zip operator lets you combine a Future[T] and a Future[V] into a Future[(T, V)] that returns a tuple. We can then map on it to process the tuple, and foreach to print the result. In general, where we were previously using Await.result on multiple different Futures before proceeding with our computation, we can instead use zip and map to perform the same logic asynchronously.

13.4.3.3 sequence

sync val files = Seq(
   "Chinatown.jpg", "ZCenter.jpg",
   "Kresge.jpg", "Memorial.jpg"
 )
 val futs = files.map: s =>
   Future{ hash(s) }
-val hashes =
-  futs.map(Await.result(_, Inf))
-val joined = hashes.mkString(" ")
-println(joined)13.30.scala
async val files = Seq(
   "Chinatown.jpg", "ZCenter.jpg",
   "Kresge.jpg", "Memorial.jpg"
 )
 val futs = files.map: s =>
   Future{ hash(s) }
+val hashes = Future.sequence(futs)
+val joined =
+  hashes.map(_.mkString(" "))
+joined.foreach(println)13.31.scala

Future.sequence lets you convert a collection of Futures Seq[Future[T]] into a Future returning a collection Future[Seq[T]]. Where we previous used .map(Await.result(_, Inf)) to aggregate a sequence of asynchronous results, Future.sequence lets us do the aggregation asynchronously. We can then use map or foreach to further process the Seq[T] that the sequenced Future computes.

13.5 Asynchronous Web Crawling

To make use of what we have learned about asynchronous Futures, let us convert the parallel-but-synchronous web crawler we wrote earlier in Parallel Web Crawling (13.3) to make it asynchronous. We want to avoid using a thread for every HTTP request, and avoid blocking the main thread using Await.result. This will allow us to ramp up the concurrency: we could make thousands of concurrent web requests, as many as the network bandwidth will allow, without worrying about per-thread overhead.

13.5.1 Web Crawling via Recursion

The first step to asynchronous crawling is to convert our for-loop into a recursive function. We need to do this as for-loops are inherently synchronous, whereas recursion gives us the flexibility to evaluate either synchronously or asynchronously. We can do that conversion as follows:

Crawler.scala//| moduleDeps: [./FetchLinks.scala]

import scala.concurrent.*, duration.Duration.Inf, java.util.concurrent.Executors
val service = Executors.newFixedThreadPool(8)
given ec: ExecutionContext = ExecutionContext.fromExecutorService(service)
def fetchAllLinksRec(startTitle: String, depth: Int): Set[String] =
  def rec(current: Set[String], seen: Set[String], recDepth: Int): Set[String] =
    if recDepth >= depth then seen
    else
      val futures = for title <- current yield Future{ fetchLinks(title) }
      val nextTitles = futures.map(Await.result(_, Inf)).flatten
      rec(nextTitles.filter(!seen.contains(_)), seen ++ nextTitles, recDepth + 1)
  rec(Set(startTitle), Set(startTitle), 0)13.32.scala

For now, we are simply converting the fetchAllLinksParallel implementation from using a loop to using recursion, while preserving its existing behavior. In the conversion from a loop to recursion, there are a few major changes:

  • The for-loop has now become a recursive function rec whose implementation contains most of the logic that was previously in the loop body

  • We call rec once at the bottom of fetchAllLinksRec in order to start the recursion

  • The current and seen variables are now instead parameters of rec, which is a recursive function defined within fetchAllLinksAsync

  • We now need to keep track of recDepth of each recursion ourselves and terminate the recursion once the desired depth has been reached

  • If we do not wish the recursion to terminate, we call rec recursively to make it execute another time

This conversion of a loop to recursion is a bit tricky, but sometimes necessary. Here it lets us make the looping logic asynchronous. For now, fetchAllLinksRec should behave in exactly the same way as our earlier fetchAllLinksParallel function: it is still synchronous, and still calls the same synchronous fetchLinks function we defined earlier. The only difference is that we converted the for-loop to recursion. Next, we will look at converting this to allow asynchronous crawling.

Next, we need to make our fetchLinks function asynchronous. The requests package we were using earlier is optimized for simple synchronous operation, so we need to find a replacement. Luckily, the Java ecosystem has a wealth of such libraries, such as the popular AsyncHttpClient library:

While this is a Java library, it is straightforward to use it from our Scala code. Going through the documentation, we first need to create a asyncHttpClient(), and after that it's straightforward to turn our requests.get call into a asyncHttpClient.execute():

FetchLinksAsync.scala//| mvnDeps:
//| - org.asynchttpclient:async-http-client:2.5.2
import scala.concurrent.*

val asyncHttpClient = org.asynchttpclient.Dsl.asyncHttpClient() // remember to close

def fetchLinksAsync(title: String)(using ec: ExecutionContext): Future[Seq[String]] =
  val p = Promise[String]
  val listenableFut = asyncHttpClient.prepareGet("https://en.wikipedia.org/w/api.php")
    .addQueryParam("action", "query").addQueryParam("titles", title)
    .addQueryParam("prop", "links").addQueryParam("format", "json")
    .execute()

  listenableFut.addListener(() => p.success(listenableFut.get().getResponseBody), null)
  val scalaFut: Future[String] = p.future
  scalaFut.map: responseBody =>
    for
      page <- ujson.read(responseBody)("query")("pages").obj.values.toSeq
      links <- page.obj.get("links").toSeq
      link <- links.arr
    yield link("title").str13.33.scala

Note that while execute returns a Java org.asynchttpclient.ListenableFuture, which is different from the scala.concurrent.Futures we have been using so far, it is straightforward to use Interfacing with Callbacks using Promises (13.4.2) to convert it into a Scala future scalaFut. We then use map on scalaFut to take the response body String and extract the titles, like what we saw earlier. fetchAllLinksAsync then returns a Future[Seq[String]], indicating that the results are not available immediately when the function call returns, but will be made available some time in the future.

The fetchLinksAsync method above is defined to take a using ec: ExecutionContext as a parameter. This ensures that any asynchronous operations fetchLinksAsync performs share thread pool of whichever method is calling it. Typically, an application will only instantiate one thread pool and share it throughout all methods that need it via a using parameter. Thread pools are relatively expensive to instantiate, so you generally want to share them rather than re-instantiating a separate one for every method.

Sharing a common thread pool also makes your application convenient to re-configure: if you find yourself deploying your application on more powerful hardware, it is easy to change the number to threads a the shared thread pool uses in one place. Similarly, if you want to run your application single-threaded to help narrow down a tricky bug, you can make your entire application run on a newSingleThreadExecutor to eliminate any parallelism-related complexity.

13.5.3 Asynchronous Recursive Web Crawling

Lastly, we can combine our asynchronous fetchAllLinksAsync method with our recursive fetchAllLinksRec method to produce a fetchAllLinksAsync method that does an asynchronous crawl of the Wikipedia API:

Crawler.scala//| moduleDeps: [./FetchLinksAsync.scala]
import scala.concurrent.*, java.util.concurrent.Executors
val service = Executors.newFixedThreadPool(8)
given ec: ExecutionContext = ExecutionContext.fromExecutorService(service)

def fetchAllLinksAsync(startTitle: String, depth: Int): Future[Set[String]] =
  def rec(current: Set[String], seen: Set[String], recDepth: Int): Future[Set[String]] =
    if recDepth >= depth then Future.successful(seen)
    else
      val futures = for title <- current yield fetchLinksAsync(title)
      Future.sequence(futures).map: nextTitleLists =>
        val nextTitles = nextTitleLists.flatten
        rec(nextTitles.filter(!seen.contains(_)), seen ++ nextTitles, recDepth + 1)
      .flatten
  rec(Set(startTitle), Set(startTitle), 0)13.34.scala

The above definition of fetchAllLinksAsync is a modified version of fetchAllLinksRec. In making our Wikipedia crawler asynchronous, there are the following major changes:

  • fetchAllLinksAsync now both return Future[Set[String]], as does rec

  • Rather than spawning a bunch of Futures using Future{ ... } blocks, we now directly call fetchLinksAsync on every value in current

  • We use Future.sequence to aggregate each batch of results, rather than calling Await.result of every item, and use map to perform the further computation.

  • In order to ensure that all branches of rec return a Future[Set[String]], we need to wrap the base case seen in a Future.successful(...) so we don't return a bare Set[String], and call .flatten on the recursive case so we don't return a nested Future[Future[Set[String]]

Note that the .map{...}.flatten pattern can also be written as a single .flatMap{...}, which is a common operator when working with asynchronous workflows using Futures.

We can now run this and print the results follows:

> fetchAllLinksAsync("Singapore", 3).foreach(println)
HashSet(Cheval de frise, 24 Horas (Spanish TV channel), 2nd People's Defence Force, ...
13.35.scala

The call to fetchAllLinksAsync returns instantly, returning the Future[Set[String]]. Here we use foreach to println on the result when it is ready, but we can also combine it into a larger asynchronous Futures workflow using the same map, zip and sequence operations we saw earlier.

Now that we are no longer limited in parallelism by the number of threads available, deeper crawls with a lot more potential concurrency finish far quicker. With a crawl depth of 4, fetchAllLinksParallel finishes in about 11.3 seconds while fetchAllLinksAsync finishes in 2.6 seconds:

> val (res, ns) = time{ fetchAllLinksParallel("Singapore", 4) }
res: Set[String] = HashSet("Ascension Island", "Baba House", ...)
val ns: scala.concurrent.duration.FiniteDuration = 11358217828 nanoseconds

> val (res, ns) = time{ Await.result(fetchAllLinksAsync("Singapore", 4), Inf)}
res: Set[String] = HashSet("Ascension Island", "Baba House", ...)
val ns: scala.concurrent.duration.FiniteDuration = 2620180174 nanoseconds
13.36.scala

In general, asynchronous concurrency allows a much greater degree of concurrency than blocking parallelism, but only for non-CPU-bound code such as the HTTP requests we are making above. Your system may still be bottle-necked in other places - network bandwidth, server rate-limiting, etc. - but it will no longer be limited by the number of threads available. While this asynchronous implementation is not necessary for small-scale crawlers, it can be useful when the number of web pages is large, and allow a single computer to crawl thousands of web pages at a time.

The implementation of fetchAllLinksAsync might look a bit unusual, but it is a relatively simple piece of code: 20 lines of code to make asynchronous HTTP requests, and 15 lines of code for the traversal logic. If we had implemented this using threads, or directly using the callbacks provided by AsyncHttpClient, we would need orders of magnitude more code to implement our parallel, asynchronous, breadth-first traversal of the graph of Wikipedia articles over the network.

13.6 Conclusion

In this chapter, we have introduced the usage of Scala's Futures as a way of easily starting parallel background tasks and aggregating their results, as well as a way of scheduling asynchronous workflows. We have covered:

  • Creating and waiting on Futures
  • Using Futures to parallelize compute-bound code (hashing files)
  • Writing a parallel web crawler that performs a breadth-first traversal of Wikipedia's page-link graph
  • Writing asynchronous code using Futures: interfacing with callbacks and asynchronous operators

Futures fit best in a "fork-join" model of parallelism: we split off independent tasks, run them in parallel, and then aggregate the results. This works best when each task is independent and pure, without internal state, and indifferent to what order it happens in. In other cases, there is still parallelism to be had: we will cover such scenarios later, in Chapter 16: Message-based Parallelism with Actors.

We have now spent the last three chapters interacting with HTTP servers and services from a client's perspective. The next two chapters, we will see it from the server's point of view: how to set up the HTML websites, JSON APIs, and backing databases that make up a typical web service looks like today.

Exercise: Use Futures to parallelize the MDN Web API Documentation scraper we wrote in Chapter 11: Scraping Websites, allowing it to run N ways parallel on the background thread pool.

See example 13.6 - ParallelScrapingDocs

Exercise: Take the generic merge sort from Chapter 6: Implementing Algorithms in Scala and parallelize it using Futures. As the number of Futures may be large, we should avoid blocking threads using Await and instead use Asynchronous Operators (13.4.3) where possible. Compare the performance of the parallel and sequential merge sorts: is the parallel merge sort faster? How is the speed affected if we fall back to sequential merge sorts when the sequences become small?

See example 13.7 - ParallelMergeSort

Exercise: Modify our asynchronous wikipedia crawler to limit the concurrent requests it makes to a configurable number, e.g. 32, to avoid being rate-limited or overloading the server we are fetching data from.

See example 13.8 - AsyncCrawlerThrottled

Exercise: Make use of the AsyncHttpClient we used in this chapter, together with the Jsoup.parse function we saw in Chapter 11: Scraping Websites, to write an asynchronous MDN Web API Documentation scraper that can run in parallel without being limited by the number of threads. To avoid overloading the network or server, limit the number of concurrent requests to 32.

See example 13.9 - AsyncThrottledScrapingDocs
Discuss Chapter 13 online at https://www.handsonscala.com/discuss/13