| 13.1 Parallel Computation using Futures | 239 |
| 13.2 N-Ways Parallelism | 242 |
| 13.3 Parallel Web Crawling | 245 |
| 13.4 Asynchronous Futures | 250 |
| 13.5 Asynchronous Web Crawling | 254 |
def fetchAllLinksParallel(startTitle: String, depth: Int): Set[String] =
var seen = Set(startTitle)
var current = Set(startTitle)
for i <- Range(0, depth) do
val futures = for title <- current yield Future{ fetchLinks(title) }
val nextTitleLists = futures.map(Await.result(_, Inf))
current = nextTitleLists.flatten.filter(!seen.contains(_))
seen = seen ++ current
seen
13.1.scala
Snippet 13.1: a simple parallel web-crawler implemented using Scala Futures
The Scala programming language comes with a Futures API. Futures make parallel and asynchronous programming much easier to handle than working with traditional techniques of threads, locks, and callbacks.
This chapter dives into Scala's Futures: how to use them, how they work, and how you can use them to parallelize data processing workflows. It culminates in using Futures together with the techniques we learned in Chapter 12: Working with HTTP APIs to write a high-performance concurrent web crawler in a straightforward and intuitive way.
For this chapter, we will use the Scala CLI REPL:
> import scala.concurrent.*, duration.Duration.Inf, java.util.concurrent.Executors
> given ec: ExecutionContext =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
13.2.scala
We import the common classes and functions to let us work with Futures from the
scala.concurrent package. We also import the duration
scala.concurrent.duration.Inf, and construct an ExecutionContext using a
newFixedThreadPool with 8 threads as a reasonable default to get started with.
You can control the level of parallelism by changing the number of threads.
There are also other kinds of ExecutionContext available for more advanced use
cases.
Note that in a larger application, ExecutionContexts created from thread pools
need to be explicitly terminated using .shutdown() when no longer needed (e.g.
in a finally block at the end of the method in which they are used, or at the
end of the main method if they are global). Thread pools are somewhat
expensive to instantiate, so where possible you typically want to share them
throughout your program rather than re-instantiating them everywhere they are
needed.
As an example computation we can parallelize using Futures, we will be using an
implementation of the BCrypt hash function from the following repository. We can
load this into our Scala REPL and define a def hash function that we
can use to hash files.
$ ./mill --import "at.favre.lib:bcrypt:0.9.0" --repl
> import at.favre.lib.crypto.bcrypt.{BCrypt, LongPasswordStrategies}
> val hasher = BCrypt.`with`(
LongPasswordStrategies.hashSha512(BCrypt.Version.VERSION_2A))
> def hash(name: String) =
val salt = name.take(16).padTo(16, ' ').getBytes
val bytes = hasher.hash(/*difficulty*/ 17, salt, os.read.bytes(os.pwd / name))
String(bytes)
13.3.scala
We will be using the four following files as the data we will hash throughout this chapter:
With the difficulty level of hashing set to 17, hashing one of these files
takes a significant amount of time to complete.
This is clear if we define a timer helper function and then run hash
on e.g. Chinatown.jpg:
> def time[T](op: => T) =
val start = System.nanoTime
val value = op
val taken = System.nanoTime - start
(value, duration.FiniteDuration(taken, "nanos"))
> time{ hash("Chinatown.jpg") }
res0: (String, scala.concurrent.duration.FiniteDuration) = (
"$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefgcTVP7sc2rYBdPb2",
10942495521 nanoseconds
)
13.4.scala
Using the time{} function, we can see that this takes about 10 seconds to
complete, depending on the hardware you are running on.
Consider the following code that executes two call to hash sequentially. We
can see that this takes about 20 seconds to run:
> time{
println(hash("Chinatown.jpg"))
println(hash("ZCenter.jpg"))
}
$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefgcTVP7sc2rYBdPb2
$2a$17$UiLjZlPjag3oaEaeGA.eG.8KLuk3HS0iqPGaRJPdp1Bjl4zjhQLWi
res1: (Unit, scala.concurrent.duration.FiniteDuration) = ((), 20273416425 nanoseconds)
13.5.scala
Typical Scala code, like most programming languages, executes sequentially: line
by line from top to bottom, left to right within each line. If we can run both
hash calls in parallel, we should be able to do it in half the time: about 10
seconds. That is where Futures come in.
The basic building block is scala.concurrent.Future, which we have imported as
simply Future. Futures define lightweight tasks that run on a thread pool,
perform some computation, and return a value. We construct a simple thread pool
earlier (Executors.newFixedThreadPool(8)), but there a range of options you can
choose from (newWorkStealingPool, newSingleThreadExecutor, etc.).
To create a future, use the Future{ ... } syntax:
> val f1 = Future{ "hello" + 123 + "world" }
f1: scala.concurrent.Future[String] = Future(Success(hello123world))
> val f2 = Future{ hash("Chinatown.jpg") }
f2: scala.concurrent.Future[String] = Future(<not completed>)
13.6.scala
We can see that the first Future f1 is already completed when printed out.
That makes sense, as the computation is simple and would complete almost
instantly, even before the result is printed to the console. The second Future
f2 uses the hash function we defined earlier, and therefore takes longer and
is listed as <not completed>. However, both lines val f1 = and val f2 =
complete instantly! This is because when we create a Future, it runs in the
background on a thread pool and we can continue doing other things while it is
working.
If we wait about 10 seconds and ask for f2 again, we will see it complete and
print out the hash of the file:
> f2
res2: scala.concurrent.Future[String] = Future(Success($2a$17$O0fnZkDyZ1b...))
13.7.scalaA Future's computation returns the single value at the end of the Future{ ... } block.
When we want the return value of the Future, we can use Await.result to get it:
> val f = Future{ hash("Chinatown.jpg") }
f: scala.concurrent.Future[String] = Future(<not completed>)
> Await.result(f, Inf)
res3: String = "$2a$17$O0fnZkDyZ1bsJinOPw.eG.H80jYKe4v1rAF8k5sH9uRue4tma50rK"
13.8.scala
Even just creating a single Future to run in the background can be useful, as
you can have your code do other things while the Future is running. For example,
we can run one hash call inside the Future, do another hash call on the main
thread, and then use Await.result to wait for the background call to complete:
> val f = Future{ hash("Chinatown.jpg") }
f: scala.concurrent.Future[String] = Future(<not completed>)
> val result = hash("ZCenter.jpg")
result: String = "$2a$17$UiLjZlPjag3oaEaeGA.eG.8KLuk3HS0iqPGaRJPdp1Bjl4zj..."
> val backgroundResult = Await.result(f, Inf)
backgroundResult: String = "$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefg..."
13.9.scala
We can visualize the execution of the main thread and future as follows:
What Await.result does depends on whether the Future is completed when the
Await.result is called on the main thread:
If the Future is already completed, then Await.result immediately returns
the value computed by the Future for you to use
If the Future has not yet completed, then Await.result will wait until the
value is ready before returning it
In both cases, you do not need to worry about using mutable variables, locking,
or race conditions when transferring the result of the background Future back to
the main thread: the Await.result function does that for you automatically,
ensuring that your main thread will be given the backgroundResult value to use
once it is ready.
We have so far looked at two cases:
Let's now consider a third case: splitting a trivially-parallel computation into N parts, and running it N-ways parallel in the background.
First, consider the following sequential computation, which runs our def hash
function over every file in the process working directory. Below, we can see
that the process overall takes about 39 seconds, running sequentially in a
for-comprehension:
> val (hashes, duration) = time{
for p <- os.list(os.pwd) yield
println("Hashing " + p)
hash(p.last)
}
Hashing /Users/lihaoyi/test/Chinatown.jpg
Hashing /Users/lihaoyi/test/Kresge.jpg
Hashing /Users/lihaoyi/test/Memorial.jpg
Hashing /Users/lihaoyi/test/ZCenter.jpg
val hashes: IndexedSeq[String] = ArraySeq(
"$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefgcTVP7sc2rYBdPb2",
"$2a$17$Q1Hja0bjJknuXw.eGA.eG.KQ7bQl8kQbzR4sTBdT97icinfz5xh66",
"$2a$17$RUTrZ1HnWUusYl/lGA.eG.TK2UsZfBYw6mLhDyORr659FFz2lPwZK",
"$2a$17$UiLjZlPjag3oaEaeGA.eG.8KLuk3HS0iqPGaRJPdp1Bjl4zjhQLWi"
)
val duration: scala.concurrent.duration.FiniteDuration = 38949493436 nanoseconds
13.10.scalaBecause hashing one file is totally independent of hashing any other file, this
computation is trivially parallelizable, and we can use Futures to utilize
multiple cores to do the hashing in parallel. This involves kicking off a
separate Future{...} block for every file we want to hash, and then waiting
for all the results at the end:
val (hashes, duration) = time{
- for p <- os.list(os.pwd) yield
+ val futures = for p <- os.list(os.pwd) yield Future{
println("Hashing " + p)
hash(p.last)
+ }
+ futures.map(Await.result(_, Inf))
}
13.11.scala
When we run this parallelized code snippet in the REPL, it produces the following output:
Hashing /Users/lihaoyi/test/Chinatown.jpg
Hashing /Users/lihaoyi/test/Kresge.jpg
Hashing /Users/lihaoyi/test/ZCenter.jpg
Hashing /Users/lihaoyi/test/Memorial.jpg
val hashes: IndexedSeq[String] = ArraySeq(
"$2a$17$O0fnZkDyZ1bsJknuXw.eG.9Mesh9W03ZnVPefgcTVP7sc2rYBdPb2",
"$2a$17$Q1Hja0bjJknuXw.eGA.eG.KQ7bQl8kQbzR4sTBdT97icinfz5xh66",
"$2a$17$RUTrZ1HnWUusYl/lGA.eG.TK2UsZfBYw6mLhDyORr659FFz2lPwZK",
"$2a$17$UiLjZlPjag3oaEaeGA.eG.8KLuk3HS0iqPGaRJPdp1Bjl4zjhQLWi"
)
val duration: scala.concurrent.duration.FiniteDuration = 10292549149 nanoseconds
13.12.output-scala
While earlier we had two Futures which we Awaited on individually, now we have
a whole list of Futures (val futures) that we map over to await on all of
them.
Note that the order in which the independent Futures are executed is arbitrary:
the four Hashing ... messages printed by our parallel hashing code are in a
different order than our previous sequential execution, and may be in different
orders if we re-run the parallel hashing multiple times. However, when we run
futures.map(Await.result(_, Inf)), the final collection of hashes will be
assembled in the same order the Futures were created regardless of which order
the hashing actually happened.
We can see that the total is down from 39 seconds to 10 seconds (10,292,549,149 nanoseconds). This is expected, since the Futures running on a background threadpool utilizes all CPU cores to run in parallel:
Unlike Threads, using Futures is often an easy way to make use of parallelism for minimal extra effort: here a three-line code change! You do not need to deal with the error-prone locks, queues, volatiles and atomics that typically permeate multi-threaded code. These construct are still used under the hood, but the Futures framework encapsulates it so you usually do not need to work with them directly.
Futures are also much cheaper than threads, and can be used much more freely:
The memory overhead of a thread is typically has several megabytes, while that of a Future is less than a kilobyte
Switching between threads thread takes several microseconds, while switching between Futures takes a tenth of a microsecond
As a result, while you generally want to stay below 1,000 threads due to performance and memory concerns, you can easily have 100,000s or 1,000,000s of Futures without issue. In this case, even if the folder had a large number of files within it, creating one Future per file is unlikely to cause issues.
Like Threads, the order in which Futures evaluate is arbitrary, and may differ between runs of your program. However, as long as the value being evaluated by each Future does not have side effects, the non-deterministic order of evaluation will not affect the behavior or correctness of your code. You thus should ensure the computations you are parallelizing with Futures are "pure" and only return a value with no observable side effects: if not, the side effects may occur in a different order each time your code runs, resulting in confusing and non-deterministic bugs.
Apart from parallelizing CPU-bound code, Futures can also be useful for parallelizing network-bound code. We will now explore how to use Futures to parallelize a common task (and a common interview exercise!): writing a parallel web crawler.
Let us assume the task is as follows:
Given the
titleof a page on Wikipedia (e.g."Albert Einstein"), write a program that will fetch the set of all pages within a certain numberdepthof links from the root page, and do so in parallel for performance.
There are several ways of approaching this exercise, which is essentially to implement a traversal of the graph of Wikipedia pages and the links between them. You can do the traversal either breadth-first or depth-first, and then there are many ways you could implement parallelism. For now we will consider just one approach: a breadth first traversal, parallelized using Futures.
For example, consider the following code to make a HTTP request and fetch all the links on a Wikipedia page:
FetchLinks.scala13.13.scaladeffetchLinks(title:String):Seq[String]=valresp=requests.get("https://en.wikipedia.org/w/api.php",params=Seq("action"->"query","titles"->title,"prop"->"links","format"->"json"))forpage<-ujson.read(resp)("query")("pages").obj.values.toSeq links<-page.obj.get("links").toSeq link<-links.arryieldlink("title").str
This makes use of the requests library to fetch the data from Wikipedia, and
the ujson library to parse it, similar to what we did in Chapter 12: Working with HTTP APIs. You can add a println(resp.text()) if you would like to see
the raw response from Wikipedia, but for this chapter it is enough to know that
this function can be called on any Wikipedia page to return a list of pages that
are linked from it:
> val links = fetchLinks("Albert Einstein")
links: Seq[String] = Seq(
"20th Century Press Archives",
"2dF Galaxy Redshift Survey",
"A priori and a posteriori",
"Aage Bohr",
"Aarau",
"Aargau",
"Abba Eban",
"Abdominal aortic aneurysm",
"Abdus Salam",
"Absent-minded professor"
)
13.14.scala
For simplicity, let's ignore the fact that Wikipedia only returns the first 10
links on each page by default, and requires pagination to access the full list.
This fetchLinks function will be the foundation on which we build our
Wikipedia crawler.
First, let us write the function that does a simple breadth-first traversal of the page-link graph:
Crawler.scala13.15.scala//| moduleDeps: [./FetchLinks.scala]deffetchAllLinks(startTitle:String,depth:Int):Set[String]=varseen=Set(startTitle)varcurrent=Set(startTitle)fori<-Range(0,depth)dovalnextTitleLists=fortitle<-currentyieldfetchLinks(title)current=nextTitleLists.flatten.filter(!seen.contains(_))seen=seen++current seen
Unlike the breadth-first search we implemented in Chapter 6: Implementing Algorithms in Scala, we do not maintain an explicit queue of links to process,
and instead simply process all depth 1 links, then all depth 2 links, etc. until
we get to the depth we want. The current set keeps track of what pages belong
to the current depth, and the seen set prevents us from redundantly processing
a page more than once. Loading this into the REPL using ./mill -i Crawler.scala:repl,
we can call fetchAllLinks to traverse the page-link
graph to various depths:
> fetchAllLinks("Singapore", 2)
res4: Set[String] = HashSet(
"1907 Vancouver anti-Asian riots",
"14th G-15 summit",
"1945 Yugoslavian parliamentary election",
"1819 Singapore Treaty",
".ai",
...
13.16.scalaAdding parallelism is as simple as performing each batch of fetches in parallel
using Future{ ... }, then aggregating the results using Await.result:
- val nextTitleLists = for title <- current yield fetchLinks(title)
+ val futures = for title <- current yield Future{ fetchLinks(title) }
+ val nextTitleLists = futures.map(Await.result(_, Inf))
13.17.scala
The final code looks like this:
Crawler.scala13.18.scala//| moduleDeps: [./FetchLinks.scala]importscala.concurrent.*,duration.Duration.Inf,java.util.concurrent.Executorsimportduration.*valservice=Executors.newFixedThreadPool(8)givenec:ExecutionContext=ExecutionContext.fromExecutorService(service)deffetchAllLinksParallel(startTitle:String,depth:Int):Set[String]=varseen=Set(startTitle)varcurrent=Set(startTitle)fori<-Range(0,depth)dovalfutures=for(title<-current)yieldFuture{fetchLinks(title)}valnextTitleLists=futures.map(Await.result(_,Inf))current=nextTitleLists.flatten.filter(!seen.contains(_))seen=seen++current seen
This parallelizes each batch of fetchLinks calls, waiting until all calls in
that batch complete before aggregating the results to prepare the next batch. We
can then use this function the same way as the non-parallel version, returning
the same results:
> fetchAllLinksParallel("Singapore", 2)
res5: Set[String] = HashSet(
"1907 Vancouver anti-Asian riots",
"14th G-15 summit",
"1945 Yugoslavian parliamentary election",
"1819 Singapore Treaty",
"2nd People's Defence Force",
...
13.19.scala
fetchAllLinksParallel executes as follows:
Comparing the output of fetchAllLinks and fetchAllLinksParallel, it is easy
to verify that the output of the two functions is identical:
> fetchAllLinks("Singapore", 2) == fetchAllLinksParallel("Singapore", 2)
res6: Boolean = true
> fetchAllLinks("Singapore", 3) == fetchAllLinksParallel("Singapore", 3)
res7: Boolean = true
13.20.scala
Using the time{} function again, we can see that the parallel version completes
significantly faster:
> time{fetchAllLinks("Singapore", 2)}._2
res8: scala.concurrent.duration.FiniteDuration = 4719789996 nanoseconds
> time{fetchAllLinksParallel("Singapore", 2)}._2
res9: scala.concurrent.duration.FiniteDuration = 1342978751 nanoseconds
13.21.scala> time{fetchAllLinks("Singapore", 3)}._2
res10: scala.concurrent.duration.FiniteDuration = 31061249346 nanoseconds
> time{fetchAllLinksParallel("Singapore", 3)}._2
res11: scala.concurrent.duration.FiniteDuration = 4569134866 nanoseconds
13.22.scala
At depth 2, the parallel crawler takes 1.3s instead of 4.7s. At depth 3, the parallel crawler takes 4.6s instead of 31.0s. This is a significant speedup for just a three-line code change!
In this case, we are getting a maximum of about 8x parallelism because the thread pool we constructed has 8 threads.
As a rule of thumb, you generally want to keep the number of threads less than
1,000 to limit resource usage and thread contention. Here the requests.get
method we use in fetchAllLinks is blocking - it needs at least one thread per
open HTTP request - and thus we are limited in how many requests we can send in
parallel. Higher levels of concurrency can be achieved by using Futures
asynchronously, which we will cover later when we discuss
Asynchronous Futures (13.4).
Another limitation may be the service you are querying, in this case Wikipedia. Most third-party services have rate limits meant to stop you from flooding them with requests, and even for those that don't you generally still want to avoid overloading their backend systems. By changing the size of the thread pool, you can control the degree of parallelism to avoid causing issues in the service you are interacting with.
As written, the amount of parallelism we can get is still sub-optimal: If one
HTTP request takes a long time, it prevents the next batch from starting even if
all other parallel HTTP requests have already completed. This is inherent in the
batch-by-batch way we are doing parallelism. Improving fetchAllLinksParallel
to better handle this case is left as an exercise to the reader, and can be done
using techniques from Chapter 16: Message-based Parallelism with Actors.
So far we have been using Await.result to wait for a Future to complete before
we can make use of its result. This makes it convenient to pass results between
our parallel Futures, but requires the system thread calling Await.result to
stop and wait for the Future to complete. Threads have overhead, so while 10 or
100 threads waiting around is not an issue, in high-concurrency systems having
1,000s of threads doing nothing is a significant waste of memory and other
resources.
Futures can also work asynchronously: we can schedule work and interface with asynchronous callbacks without wasting a thread waiting for a result. This can greatly improve performance in high-concurrency scenarios.
There are two main ways to work with Futures asynchronously: interfacing with
external callback-based APIs using Promises, and via asynchronous operators such
as the map, foreach, zip, sequence, and flatMap and operations. We
will cover both of them in this section.
So far we have only been creating Futures via Future{ ... } blocks, which
schedules the code in the block to run on a background thread pool. Another way
to create Futures is via scala.concurrent.Promise:
> def doThing(succeed: Boolean) =
val p: Promise[String] = Promise[String]
val f: Future[String] = p.future
f.onComplete:
case scala.util.Success(res) => println(s"Success! $res")
case scala.util.Failure(exception) => println(s"Failure :( $exception")
if succeed then p.success("Yay!")
else p.failure(Exception("boom"))
> doThing(succeed = true)
Success! Yay!
res12: scala.concurrent.Promise[String] = Future(Success(Yay!))
> doThing(succeed = false)
Failure :( java.lang.Exception: boom
res13: scala.concurrent.Promise[String] = Future(Failure(java.lang.Exception: boom))
13.23.scala
A Promise is like the "write" side of a Future, while the Future is the "read"
side. Completing the Promise using .success or .failure, does two things:
Await.result.onCompleteNote that Futures and Promises can complete either successfully with a result or
fail with an exception: in the failure case Await.result simply re-throws the
exception, while onComplete requires you to handle both cases.
Using Promises to trigger Futures demonstrates that a Future is not
necessarily a background task running on a thread pool: it is a more general
construct, an operation that may complete asynchronously some time later. A task
running on a thread pool is one such operation, but any operation with some kind
of "on success" callback can also be modeled using Future.
Futures and Promises are equivalent to callbacks. Any function that takes success and error callbacks can be mechanically converted to a function that returns a Future, and vice versa:
def futureFunc(): Future[String] = ...
def callbackFunc(onSuccess: String => Unit, onError: Throwable => Unit): Unit =
futureFunc().onComplete:
case scala.util.Success(str) => onSuccess(str)
case scala.util.Failure(ex) => onError(ex)
13.24.scaladef futureFunc(): Future[String] =
val p = Promise[String]
callbackFunc(
onSuccess = str => p.success(str),
onError = ex => p.failure(ex)
)
p.future
def callbackFunc(onSuccess: String => Unit, onError: Throwable => Unit): Unit = ...
13.25.scala
While much of the Scala ecosystem uses Futures, the broader Java ecosystem has many libraries that design their asynchronous interfaces around callbacks. This inter-operability makes it straightforward to wrap any sort of asynchronous operation in a Future, allowing you to use it seamlessly from your Future-using code. We will see an example of this later, when we explore Asynchronous Web Crawling (13.5).
If we are working with Futures asynchronously, we generally want to avoid using
Await.result to pass the result of one Future to another. While we can chain
together Futures using callbacks, doing so is often inconvenient and
error-prone. Scala's Future data type contains some useful methods that allow
you to combine and transform the result of your Futures in a few common ways,
making asynchronous computations both efficient and convenient.
In this section, we look at a few synchronous usages of Futures via
Await.result (left) and see how we can instead write the code using
asynchronous operators (right) to do the same computation but without blocking a
thread waiting for results.
map lets you transform the result of a Future using a function, while
foreach is similar to the onComplete method we saw earlier, except it only
handles successful results. For example, rather than using Await.result to
wait for the value of the hash and then calling length before printing it, we
can instead use map to schedule the length call to happen whenever the
hashing Future completes, and then foreach to schedule the println to happen
after length computation completes:
|
|
This allows us to perform our length computation and println action
asynchronously, without blocking the main thread. Just like how the call to
Future{ ... } returns immediately even if the background computation has not
yet completed, the .map and .foreach calls also return immediately and free
up the calling thread to do other things.
|
|
The zip operator lets you combine a Future[T] and a Future[V] into a
Future[(T, V)] that returns a tuple. We can then map on it to process the
tuple, and foreach to print the result. In general, where we were previously
using Await.result on multiple different Futures before proceeding with our
computation, we can instead use zip and map to perform the same logic
asynchronously.
|
|
Future.sequence lets you convert a collection of Futures Seq[Future[T]] into
a Future returning a collection Future[Seq[T]]. Where we previous used
.map(Await.result(_, Inf)) to aggregate a sequence of asynchronous results,
Future.sequence lets us do the aggregation asynchronously. We can then use
map or foreach to further process the Seq[T] that the sequenced Future
computes.
To make use of what we have learned about asynchronous Futures, let us convert
the parallel-but-synchronous web crawler we wrote earlier in
Parallel Web Crawling (13.3) to make it asynchronous. We want
to avoid using a thread for every HTTP request, and avoid blocking the main
thread using Await.result. This will allow us to ramp up the concurrency: we
could make thousands of concurrent web requests, as many as the network
bandwidth will allow, without worrying about per-thread overhead.
The first step to asynchronous crawling is to convert our for-loop into a
recursive function. We need to do this as for-loops are inherently
synchronous, whereas recursion gives us the flexibility to evaluate either
synchronously or asynchronously. We can do that conversion as follows:
Crawler.scala13.32.scala//| moduleDeps: [./FetchLinks.scala]importscala.concurrent.*,duration.Duration.Inf,java.util.concurrent.Executorsvalservice=Executors.newFixedThreadPool(8)givenec:ExecutionContext=ExecutionContext.fromExecutorService(service)deffetchAllLinksRec(startTitle:String,depth:Int):Set[String]=defrec(current:Set[String],seen:Set[String],recDepth:Int):Set[String]=ifrecDepth>=depththenseenelsevalfutures=fortitle<-currentyieldFuture{fetchLinks(title)}valnextTitles=futures.map(Await.result(_,Inf)).flatten rec(nextTitles.filter(!seen.contains(_)),seen++nextTitles,recDepth+1)rec(Set(startTitle),Set(startTitle),0)
For now, we are simply converting the fetchAllLinksParallel implementation
from using a loop to using recursion, while preserving its existing behavior. In
the conversion from a loop to recursion, there are a few major changes:
The for-loop has now become a recursive function rec whose implementation
contains most of the logic that was previously in the loop body
We call rec once at the bottom of fetchAllLinksRec in order to start the
recursion
The current and seen variables are now instead parameters of rec, which
is a recursive function defined within fetchAllLinksAsync
We now need to keep track of recDepth of each recursion ourselves and
terminate the recursion once the desired depth has been reached
If we do not wish the recursion to terminate, we call rec recursively to
make it execute another time
This conversion of a loop to recursion is a bit tricky, but sometimes necessary.
Here it lets us make the looping logic asynchronous. For now, fetchAllLinksRec
should behave in exactly the same way as our earlier fetchAllLinksParallel
function: it is still synchronous, and still calls the same synchronous
fetchLinks function we defined earlier. The only difference is that we
converted the for-loop to recursion. Next, we will look at converting this to
allow asynchronous crawling.
Next, we need to make our fetchLinks function asynchronous. The requests
package we were using earlier is optimized for simple synchronous operation, so
we need to find a replacement. Luckily, the Java ecosystem has a wealth of such
libraries, such as the popular AsyncHttpClient library:
While this is a Java library, it is straightforward to use it from our Scala
code. Going through the documentation, we first need to create a
asyncHttpClient(), and after that it's straightforward to turn our
requests.get call into a asyncHttpClient.execute():
FetchLinksAsync.scala13.33.scala//| mvnDeps://| - org.asynchttpclient:async-http-client:2.5.2importscala.concurrent.*valasyncHttpClient=org.asynchttpclient.Dsl.asyncHttpClient()// remember to closedeffetchLinksAsync(title:String)(usingec:ExecutionContext):Future[Seq[String]]=valp=Promise[String]vallistenableFut=asyncHttpClient.prepareGet("https://en.wikipedia.org/w/api.php").addQueryParam("action","query").addQueryParam("titles",title).addQueryParam("prop","links").addQueryParam("format","json").execute()listenableFut.addListener(()=>p.success(listenableFut.get().getResponseBody),null)valscalaFut:Future[String]=p.future scalaFut.map:responseBody=>forpage<-ujson.read(responseBody)("query")("pages").obj.values.toSeq links<-page.obj.get("links").toSeq link<-links.arryieldlink("title").str
Note that while execute returns a Java org.asynchttpclient.ListenableFuture,
which is different from the scala.concurrent.Futures we have been using so
far, it is straightforward to use
Interfacing with Callbacks using Promises (13.4.2)
to convert it into a Scala future scalaFut. We then use map on scalaFut to
take the response body String and extract the titles, like what we saw
earlier. fetchAllLinksAsync then returns a Future[Seq[String]], indicating
that the results are not available immediately when the function call returns,
but will be made available some time in the future.
The fetchLinksAsync method above is defined to take a using ec: ExecutionContext
as a parameter. This ensures that any asynchronous operations
fetchLinksAsync performs share thread pool of whichever method is calling it.
Typically, an application will only instantiate one thread pool and share it
throughout all methods that need it via a using parameter. Thread pools
are relatively expensive to instantiate, so you generally want to share them
rather than re-instantiating a separate one for every method.
Sharing a common thread pool also makes your application convenient to
re-configure: if you find yourself deploying your application on more powerful
hardware, it is easy to change the number to threads a the shared thread pool
uses in one place. Similarly, if you want to run your application
single-threaded to help narrow down a tricky bug, you can make your entire
application run on a newSingleThreadExecutor to eliminate any
parallelism-related complexity.
Lastly, we can combine our asynchronous fetchAllLinksAsync method with our
recursive fetchAllLinksRec method to produce a fetchAllLinksAsync method
that does an asynchronous crawl of the Wikipedia API:
Crawler.scala13.34.scala//| moduleDeps: [./FetchLinksAsync.scala]importscala.concurrent.*,java.util.concurrent.Executorsvalservice=Executors.newFixedThreadPool(8)givenec:ExecutionContext=ExecutionContext.fromExecutorService(service)deffetchAllLinksAsync(startTitle:String,depth:Int):Future[Set[String]]=defrec(current:Set[String],seen:Set[String],recDepth:Int):Future[Set[String]]=ifrecDepth>=depththenFuture.successful(seen)elsevalfutures=fortitle<-currentyieldfetchLinksAsync(title)Future.sequence(futures).map:nextTitleLists=>valnextTitles=nextTitleLists.flatten rec(nextTitles.filter(!seen.contains(_)),seen++nextTitles,recDepth+1).flatten rec(Set(startTitle),Set(startTitle),0)
The above definition of fetchAllLinksAsync is a modified version of
fetchAllLinksRec. In making our Wikipedia crawler asynchronous, there are the
following major changes:
fetchAllLinksAsync now both return Future[Set[String]], as does rec
Rather than spawning a bunch of Futures using Future{ ... } blocks, we now
directly call fetchLinksAsync on every value in current
We use Future.sequence to aggregate each batch of results, rather than
calling Await.result of every item, and use map to perform the further
computation.
In order to ensure that all branches of rec return a Future[Set[String]],
we need to wrap the base case seen in a Future.successful(...) so we don't
return a bare Set[String], and call .flatten on the recursive case so we
don't return a nested Future[Future[Set[String]]
Note that the .map{...}.flatten pattern can also be written as a single
.flatMap{...}, which is a common operator when working with asynchronous
workflows using Futures.
We can now run this and print the results follows:
> fetchAllLinksAsync("Singapore", 3).foreach(println)
HashSet(Cheval de frise, 24 Horas (Spanish TV channel), 2nd People's Defence Force, ...
13.35.scala
The call to fetchAllLinksAsync returns instantly, returning the
Future[Set[String]]. Here we use foreach to println on the result when it
is ready, but we can also combine it into a larger asynchronous Futures workflow
using the same map, zip and sequence operations we saw earlier.
Now that we are no longer limited in parallelism by the number of threads
available, deeper crawls with a lot more potential concurrency finish far
quicker. With a crawl depth of 4, fetchAllLinksParallel finishes in about
11.3 seconds while fetchAllLinksAsync finishes in 2.6 seconds:
> val (res, ns) = time{ fetchAllLinksParallel("Singapore", 4) }
res: Set[String] = HashSet("Ascension Island", "Baba House", ...)
val ns: scala.concurrent.duration.FiniteDuration = 11358217828 nanoseconds
> val (res, ns) = time{ Await.result(fetchAllLinksAsync("Singapore", 4), Inf)}
res: Set[String] = HashSet("Ascension Island", "Baba House", ...)
val ns: scala.concurrent.duration.FiniteDuration = 2620180174 nanoseconds
13.36.scala
In general, asynchronous concurrency allows a much greater degree of concurrency than blocking parallelism, but only for non-CPU-bound code such as the HTTP requests we are making above. Your system may still be bottle-necked in other places - network bandwidth, server rate-limiting, etc. - but it will no longer be limited by the number of threads available. While this asynchronous implementation is not necessary for small-scale crawlers, it can be useful when the number of web pages is large, and allow a single computer to crawl thousands of web pages at a time.
The implementation of fetchAllLinksAsync might look a bit unusual, but it is a
relatively simple piece of code: 20 lines of code to make asynchronous HTTP
requests, and 15 lines of code for the traversal logic. If we had implemented
this using threads, or directly using the callbacks provided by AsyncHttpClient,
we would need orders of magnitude more code to implement our parallel,
asynchronous, breadth-first traversal of the graph of Wikipedia articles over
the network.
In this chapter, we have introduced the usage of Scala's Futures as a way of easily starting parallel background tasks and aggregating their results, as well as a way of scheduling asynchronous workflows. We have covered:
Futures fit best in a "fork-join" model of parallelism: we split off independent tasks, run them in parallel, and then aggregate the results. This works best when each task is independent and pure, without internal state, and indifferent to what order it happens in. In other cases, there is still parallelism to be had: we will cover such scenarios later, in Chapter 16: Message-based Parallelism with Actors.
We have now spent the last three chapters interacting with HTTP servers and services from a client's perspective. The next two chapters, we will see it from the server's point of view: how to set up the HTML websites, JSON APIs, and backing databases that make up a typical web service looks like today.
Exercise: Use Futures to parallelize the MDN Web API Documentation scraper we wrote in Chapter 11: Scraping Websites, allowing it to run N ways parallel on the background thread pool.
See example 13.6 - ParallelScrapingDocsExercise: Take the generic merge sort from Chapter 6: Implementing Algorithms in Scala and parallelize it using Futures. As the number of Futures may be
large, we should avoid blocking threads using Await and instead use
Asynchronous Operators (13.4.3) where possible. Compare the
performance of the parallel and sequential merge sorts: is the parallel merge
sort faster? How is the speed affected if we fall back to sequential merge
sorts when the sequences become small?
Exercise: Modify our asynchronous wikipedia crawler to limit the concurrent requests it makes to a configurable number, e.g. 32, to avoid being rate-limited or overloading the server we are fetching data from.
See example 13.8 - AsyncCrawlerThrottledExercise: Make use of the AsyncHttpClient we used in this chapter, together with the
Jsoup.parse function we saw in Chapter 11: Scraping Websites, to write an
asynchronous MDN Web API Documentation scraper that can run in parallel
without being limited by the number of threads. To avoid overloading the
network or server, limit the number of concurrent requests to 32.