17

Multi-Process Applications


17.1 Two-Process Build Setup327
17.2 Remote Procedure Calls330
17.3 The Agent Process331
17.4 The Sync Process332
17.5 Pipelined Syncing335

def send[T: Writer](out: DataOutputStream, msg: T): Unit =
  val bytes = upickle.writeBinary(msg)
  out.writeInt(bytes.length)
  out.write(bytes)
  out.flush()
def receive[T: Reader](in: DataInputStream) =
  val buf = new Array[Byte](in.readInt())
  in.readFully(buf)
  upickle.readBinary[T](buf)
17.1.scala

Snippet 17.1: RPC send and receive methods for sending data over an operating system pipe or network

While all our programs so far have run within a single process, in real world scenarios you will be working as part of a larger system, and the application itself may need to be split into multiple processes. This chapter will walk you through how to do so: configuring your build tool to support multiple Scala processes, sharing code and exchanging serialized messages. These are the building blocks that form the foundation of any distributed system.

As this chapter's project, we will be building a simple multi-process file synchronizer that can work over a network. This chapter builds upon the simple single-process file synchronizer in Chapter 7: Files and Subprocesses, and will form the basis for Chapter 18: Building a Real-time File Synchronizer.

In Chapter 7: Files and Subprocesses, we learned how to work with the filesystem using Scala, and wrote a simple function that synchronizes the contents of two folders by comparing the files on each side and copying files as necessary from one to the other. However, there was one big limitation of the def sync function we wrote: it can only work if both src and dest folders are on the same computer! This is because it relies on having direct access to read and write to the files in both src and dest:

G cluster_0 Local Computer src src sync sync src->sync dest dest sync->dest

This of course rules out possibly the most common use case of file syncing: synchronizing the contents of a folder between two computers across a network. To allow for remote file syncing, we will need a slightly different approach:

  • A sync process with direct access to the src/ folder will spawn an agent process with direct access to the dest folder

  • When sync needs to query or perform some action on the dest/ folder, it will send a message to agent via the agent's standard input stream, and the agent will reply via its standard output stream

For the purposes of this exercise we will be running both sync and agent on the same computer, with the two processes connected by an operating system pipe:

G cluster_0 Local Computer src src sync sync src->sync dest dest agent agent sync->agent pipe agent->dest

While the snippets in this chapter run both processes on the same computer, it is trivial to extend this to a remote scenario by e.g. spawning the child process over an SSH connection:

G cluster_0 Local Computer cluster_1 Shared Computer src src sync sync src->sync agent agent sync->agent SSH dest dest agent->dest

17.1 Two-Process Build Setup

We will set up this project using the Mill build tool. You can install Mill in your current project as follows:

$ REPO=https://repo1.maven.org/maven2/com/lihaoyi/mill-dist/1.0.6

$ curl -L "$REPO/mill-dist-1.0.6-mill.sh" -o mill

$ chmod +x mill
17.2.bash

We will start with the following build.mill file:

build.mill//%drop-lines 12,13
import mill.*, scalalib.*
trait SyncModule extends ScalaModule:
  def scalaVersion = "3.7.3"
  def mvnDeps = Seq(
    mvn"com.lihaoyi::upickle:4.4.2",
    mvn"com.lihaoyi::os-lib:0.11.6"
  )
object shared extends SyncModule
object sync extends SyncModule:
    def mvnDeps = Seq(mvn"com.lihaoyi::utest:0.9.4")
object agent extends SyncModule:
  def moduleDeps = Seq(shared)17.3.scala

This build file defines two modules - sync and agent - each with their respective sync/src/ and agent/src/ folders. Both modules share the same Scala version and dependencies on the uPickle and OS-Lib libraries, as well as a dependency on a shared module with its corresponding shared/src/ folder. You will need to manually create the sync/src/, agent/src/ and shared/src/ folders.

$ mkdir -p agent/src sync/src shared/src

You can then run $ ./mill --bsp-install to load the project into your IDE:

images/IntelliJ.png

17.1.1 Integrating our Simple File Syncer into Mill

First let us take the Scala CLI REPL snippet we wrote in Chapter 7: Files and Subprocesses and integrate it into our Mill project. That involves taking the original def sync function and converting it into a proper def main(args: Array[String]) program entrypoint that takes its arguments from the command line:

sync/src/Sync.scala-def sync(src: os.Path, dest: os.Path) =
+package sync
+object Sync:
+  def main(src0: String, dest0: String): Unit =
+    val src = os.Path(src0, os.pwd)
+    val dest = os.Path(dest0, os.pwd)
     for srcSubPath <- os.walk(src) do
       val subPath = srcSubPath.subRelativeTo(src)
       val destSubPath = dest / subPath
       (os.isDir(srcSubPath), os.isDir(destSubPath)) match
         case (false, true) | (true, false) => os.copy.over(srcSubPath, destSubPath)
         case (false, false)
           if !os.exists(destSubPath)
           || !os.read.bytes(srcSubPath).sameElements(os.read.bytes(destSubPath)) =>

           os.copy.over(srcSubPath, destSubPath, createFolders = true)

         case _ => // do nothing17.4.scala

17.1.2 Testing our File Syncer

We can then run the following script to package the compiled code into a self-contained executable, often called an assembly. Once packaged, we can run the executable manually to sync the files:

$ ./mill show sync.assembly
"ref:v0:1845e552:/Users/lihaoyi/test/out/sync/assembly.dest/out.jar"

$ find sync -type f
sync/src/Sync.scala

$ mkdir test

$ out/sync/assembly.dest/out.jar sync test

$ find test -type f
test/src/Sync.scala
17.5.bash
See example 17.1 - Main

17.2 Remote Procedure Calls

The next step is to take our various os.* calls in the sync process on the dest folder, and convert them into remote procedure calls (RPCs) that communicate with an agent process that can respond with the results. The calls we need to care about are:

  • os.isDir(destSubPath)
  • os.exists(destSubPath)
  • os.read.bytes(destSubPath)
  • os.copy.over(srcSubPath, destSubPath, createFolders = true)

To convert these into remote procedure calls, we need to define the messages that our two processes will exchange, and the protocol which the two processes will use to exchange them.

17.2.1 Defining our RPC Messages

We will put these messages in shared/src/Rpc.scala, since they will be used by both sync and agent processes:

shared/src/Rpc.scalapackage sync
import upickle.{readwriter, ReadWriter}
given subPathRw: ReadWriter[os.SubPath] =
  readwriter[String].bimap[os.SubPath](_.toString, os.SubPath(_))

enum Rpc derives ReadWriter:
  case IsDir(path: os.SubPath)
  case Exists(path: os.SubPath)
  case ReadBytes(path: os.SubPath)
  case WriteOver(src: Array[Byte], path: os.SubPath)17.6.scala

Here we are defining Rpc as a enum, because we know upfront that there are only a fixed number of messages we will want to pass between the sync and agent processes. For each case class, we also add derives ReadWriter, which defines a given ReadWriter to allow serialization to JSON or messagepack binaries, as we saw earlier in Chapter 8: JSON and Binary Data Serialization. We will need this in order to send them from one process to the other.

Note we also need to define a subPathRw, in order to handle the os.SubPaths that are part of the case classes we are serializing. Since os.SubPath can be easily converted to and from Strings, we make use of bimap in order to convert our ReadWriter[String] into a ReadWriter[os.SubPath] for us to use.

17.2.2 Bytes over the Wire

Apart from the messages themselves, we need a standard way of sending and receiving Rpc messages between the two processes. Operating system processes communicate via "pipes", which are exposed to a Scala application as a java.io.InputStream you can read from and a java.io.OutputStream you can write to. These are streams of bytes, on top of which we'll need to define a protocol for sending our case class messages.

As our protocol for sending messages over these byte streams, we will send each message in a length-prefixed fashion: every message will be preceded by a single 4-byte Int that tells the agent how long that message is, allowing it to read the correct number of bytes to pass into upickle.readBinary to deserialize into an Rpc object:

G a1 00 00 00 a1 02 1f 7a 0f d5 a7 b5 8f 21 92 92 5d ... stdin length=11 length=11 a1:b0->length=11 a1:b1->length=11 a1:b2->length=11 a1:b3->length=11 11 bytes 11 bytes a1:b4->11 bytes a1:b5->11 bytes a1:b6->11 bytes a1:b7->11 bytes a1:b8->11 bytes a1:b9->11 bytes a1:b10->11 bytes a1:b11->11 bytes a1:b12->11 bytes a1:b13->11 bytes a1:b14->11 bytes a1:b15->11 bytes upickle.readBinary1 upickle.readBinary 11 bytes->upickle.readBinary1 rpc1 Rpc upickle.readBinary1->rpc1

We assume that anyone receiving data over a stream of bytes will receive a constant stream of these messages, each one preceded by a 4-byte header telling it how many bytes that message contains. This can be implemented as follows:

shared/src/Shared.scalapackage sync
import upickle.{Reader, Writer}
object Shared:
  def send[T: Writer](out: java.io.DataOutputStream, msg: T): Unit =
    val bytes = upickle.writeBinary(msg)
    out.writeInt(bytes.length)
    out.write(bytes)
    out.flush()
  def receive[T: Reader](in: java.io.DataInputStream) =
    val buf = new Array[Byte](in.readInt())
    in.readFully(buf)
    upickle.readBinary[T](buf)17.7.scala

This protocol for sending messages via DataOutputStreams and receiving messages via DataInputStreams is implemented by the send and receive methods above. We use DataInputStreams and DataOutputStreams as they provide some conveniences e.g. for reading/writing 4-byte integers. We put this logic in shared/ because both the sync and agent processes will need to send messages to one another.

17.3 The Agent Process

Now that we've defined the case classes that will represent our RPCs, we can now implement our agent process. At a high level, the agent process does the following steps:

  1. Read an Rpc from the sync process
  2. Perform whatever action the Rpc wants it to do: read some bytes, over-write a file, etc.
  3. Return any result to the sync process
  4. Read the next message

We need some way to exchange data. The sync process that spawns agent will have access to agent's System.in and System.out streams, so we can use those two streams to exchange data between the two processes:

agent/src/Agent.scalapackage sync
object Agent:
  @main def run(): Unit =
    val input = java.io.DataInputStream(System.in)
    val output = java.io.DataOutputStream(System.out)
    while true do try
      val rpc = Shared.receive[Rpc](input)
    catch case e: java.io.EOFException => System.exit(0)17.8.scala

Above, we wrap System.in and System.out in a DataInputStream and DataOutputStream respectively, to be compatible with the send/receive methods we defined earlier. Inside a loop, we call receive to read a single Rpc worth of bytes from sync and deserialize it into an Rpc object.

Lastly, we match on the rpc, and do what it tells us to do, and write the result back to sync using Shared.send:

agent/src/Agent.scala     while true do try
       val rpc = Shared.receive[Rpc](input)
+      rpc match
+        case Rpc.IsDir(path) => Shared.send(output, os.isDir(os.pwd / path))
+        case Rpc.Exists(path) => Shared.send(output, os.exists(os.pwd / path))
+        case Rpc.ReadBytes(path) => Shared.send(output, os.read.bytes(os.pwd / path))
+        case Rpc.WriteOver(bytes, path) =>
+          os.remove.all(os.pwd / path)
+          Shared.send(output, os.write.over(os.pwd / path, bytes, createFolders = true))
     catch case e: java.io.EOFException => System.exit(0)17.9.scala

For now, we assume that the agent process is spawned with its os.pwd set to the destination folder we want to sync files to.

The try-catch block around the body of the while loop makes sure that when the sync process terminates, the agent process exits cleanly without a stack trace. Our two-process file synchronizer can still work without the try-catch, but will print an ugly stack trace to the console when syncing is complete and the Agent shuts down.

The agent process is intentionally kept simple, running operations on behalf of the sync process that it is not able to run itself. The agent only knows how to handle one operation at a time, report the result, and nothing else: the complexity will live in the sync process, which we will cover next.

17.4 The Sync Process

17.4.1 Spawning the Agent

First, we need to give sync access to the agent executable. We can do this by adding agent.assembly to sync's resources in build.mill:

build.mill object sync extends SyncModule:
   def moduleDeps = Seq(shared)
+  def resources = Task{
+    os.copy(agent.assembly().path, Task.dest / "agent.jar")
+    super.resources() ++ Seq(PathRef(Task.dest))
+  }17.10.scala

This ensures that whenever the sync module is packaged and run, the agent's assembly executable is prepared and ready to be loaded for use as agent.jar. We can then use os.read.bytes in sync's Scala code to load the binary contents of agent.jar, write it to a file, and execute it via os.spawn to create a child process we can communicate with:

sync/src/Sync.scala     val src = os.Path(args(0), os.pwd)
     val dest = os.Path(args(1), os.pwd)
+    val agentExecutable = os.temp(os.read.bytes(os.resource / "agent.jar"))
+    os.perms.set(agentExecutable, "rwx------")
+    val agent = os.spawn(cmd = agentExecutable, cwd = dest)
     for srcSubPath <- os.walk(src) do
       val subPath = srcSubPath.subRelativeTo(src)
       val destSubPath = dest / subPath17.11.scala

This gives us a val agent that represents the running agent subprocess, which is a os.SubProcess object like those in Chapter 7: Files and Subprocesses. Note that we need to write the agent.jar to a temporary file before executing it, as on the JVM packaged resource files are zipped into the enclosing sync executable and cannot be executed directly.

To make it more convenient to exchange messages with agent, we define a callAgent function that writes a message to agent using send, and reads the agent's response using receive:

sync/src/Sync.scala     val agent = os.spawn(cmd = agentExecutable, cwd = dest)
+    def callAgent[T: upickle.Reader](rpc: Rpc): T =
+      Shared.send(agent.stdin.data, rpc)
+      Shared.receive[T](agent.stdout.data)
     for srcSubPath <- os.walk(src) do17.12.scala

callAgent is parameterized on the type T that we expect in response to the Rpc we send, and T must be a type with a pre-defined upickle.Reader.

17.4.2 Delegating File Operations

Next, we need to update the filesystem walk-compare-copy logic to change all os.* calls that operate on the dest path to instead use callAgent to ask the agent process to do the work for us:

sync/src/Sync.scala       val subPath = srcSubPath.subRelativeTo(src)
       val destSubPath = dest / subPath
-      (os.isDir(srcSubPath), os.isDir(destSubPath)) match
+      (os.isDir(srcSubPath), callAgent[Boolean](Rpc.IsDir(subPath))) match
-        case (false, true) | (true, false) => os.copy.over(srcSubPath, destSubPath)
+        case (false, true) =>
+          callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath), subPath))
+        case (true, false) =>
+          for p <- os.walk(srcSubPath) if os.isFile(p) do
+            callAgent[Unit](Rpc.WriteOver(os.read.bytes(p), p.subRelativeTo(src)))
         case (false, false)
-          if !os.exists(destSubPath)
-          || !os.read.bytes(srcSubPath).sameElements(os.read.bytes(destSubPath)) =>
+          if !callAgent[Boolean](Rpc.Exists(subPath))
+          || !os.read.bytes(srcSubPath).sameElements(
+               callAgent[Array[Byte]](Rpc.ReadBytes(subPath))
+             ) =>

-          os.copy.over(srcSubPath, destSubPath, createFolders = true)
+          callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath), subPath))

         case _ => // do nothing17.13.scala

This is a relatively mechanical change, except for the (true, false) case: this is the case where a local path is a folder but the remote path is not. Previously we were relying on os.copy.over to recursively copy the source folder over the destination, but now we have to manually send individual WriteOver commands, one for each file in the source folder, to make sure they all get copied to the destination.

17.4.3 Working Syncer

The working Sync.scala code we have written now looks like this:

sync/src/Sync.scalapackage sync
object Sync:
  def main(src0: String, dest0: String): Unit =
    val src = os.Path(src0, os.pwd)
    val dest = os.Path(dest0, os.pwd)
    val agentExecutable = os.temp(os.read.bytes(os.resource / "agent.jar"))
    os.perms.set(agentExecutable, "rwx------")
    val agent = os.spawn(cmd = agentExecutable, cwd = dest)
    def callAgent[T: upickle.Reader](rpc: Rpc): T =
      Shared.send(agent.stdin.data, rpc)
      Shared.receive[T](agent.stdout.data)
    for srcSubPath <- os.walk(src) do
      val subPath = srcSubPath.subRelativeTo(src)
      val destSubPath = dest / subPath
      (os.isDir(srcSubPath), callAgent[Boolean](Rpc.IsDir(subPath))) match
        case (false, true) =>
          callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath), subPath))
        case (true, false) =>
          for p <- os.walk(srcSubPath) if os.isFile(p) do
            callAgent[Unit](Rpc.WriteOver(os.read.bytes(p), p.subRelativeTo(src)))
        case (false, false)
          if !callAgent[Boolean](Rpc.Exists(subPath))
          || !os.read.bytes(srcSubPath).sameElements(
            callAgent[Array[Byte]](Rpc.ReadBytes(subPath))
          ) =>

          callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath), subPath))

        case _ => // do nothing17.14.scala

We can test this new multi-process application with both agent and sync, as we saw earlier in Testing our File Syncer (17.1.2), to verify that it works similarly to the earlier one-process version. The big difference is that now the only point of contact between agent and sync is the agent's stdin and stdout.

This split makes it possible to take the two processes and have them run on different computers: e.g. sync may run on your laptop, and agent may run on a cloud server you are trying to sync files to. As long as you can provide a stdin/stdout interface (e.g. by running the agent over SSH) you would now be able to sync files across the network.

See example 17.2 - FileSyncer

17.5 Pipelined Syncing

While we now have a simple two-process network-compatible file synchronizer, it behaves roughly the same way as our original one-process file synchronizer: for each file in the source folder, it reads the file on the destination folder, compares the two, and copies the source file over the destination if necessary. While this works, it is an exceedingly "chatty" protocol: the syncer needs to make multiple network round trips in order to sync each file:

  • 2 round trips if the local file corresponds to a remote directory
  • 2 round trips for each file in a local folder that doesn't exist remotely
  • 3 round trips for a file that exists locally but not remotely
  • 4 round trips for a file that exists both locally and remotely, but differs in contents

The round trips of the "file exists locally but not remotely" case can be visualized as follows:

G Sync Sync Sync1 Sync->Sync1 Sync2 Sync1->Sync2 Agent1 Sync1->Agent1 Agent2 Sync1->Agent2 IsDir Sync3 Sync2->Sync3 Sync4 Sync3->Sync4 Agent4 Sync3->Agent4 Exists Sync5 Sync4->Sync5 Sync6 Sync5->Sync6 Agent6 Sync5->Agent6 WriteOver Sync7 Sync6->Sync7 Agent Agent Agent->Agent1 Agent1->Agent2 Agent2->Sync3 false Agent3 Agent2->Agent3 Agent3->Agent4 Agent4->Sync5 false Agent5 Agent4->Agent5 Agent5->Agent6 Agent7 Agent6->Agent7

This is repeated over and over for every file in the source folder.

Network round trips are slow, and a fast file syncer must aim to minimize the number of trips we require. The technique we need is Pipelining: rather than waiting for every RPC to complete before proceeding with further work, we instead send our RPCs en-masse without waiting, and then aggregate all the responses as they are streamed back. This can allow O(n) RPCs to be made at the cost of only a single round trip's worth of network latency.

G Sync Sync Sync1 Sync->Sync1 Sync2 Sync1->Sync2 Agent1 Sync1->Agent1 IsDir, IsDir, IsDir Agent3 Sync1->Agent3 Sync3 Sync2->Sync3 Agent4 Sync2->Agent4 Sync4 Sync3->Sync4 Agent5 Sync3->Agent5 Sync5 Sync4->Sync5 Sync6 Sync5->Sync6 Sync7 Sync6->Sync7 Agent Agent Agent->Agent1 Agent2 Agent1->Agent2 Agent2->Agent3 Agent3->Sync4 Agent3->Agent4 Agent4->Sync5 Agent4->Agent5 Agent5->Sync6 Agent6 Agent5->Agent6 Agent7 Agent6->Agent7 Agent7->Sync7 false, true, false

17.5.1 Pipelining RPCs

The first major change is to tweak the definition of callAgent:

sync/src/Sync.scala- def callAgent[T: upickle.Reader](rpc: Rpc): T =
+ def callAgent[T: upickle.Reader](rpc: Rpc): () => T =
    Shared.send(agent.stdin.data, rpc)
-   Shared.receive[T](agent.stdout.data)
+   () => Shared.receive[T](agent.stdout.data)17.15.scala

Instead of immediately reading a response to deserialize and return as T, we now delay reading and instead return a zero-argument function of type () => T that you can call to read and deserialize the response later. This lets the caller of callAgent decide when they want to call it:

  • They could wait for a response immediately by calling the function
  • Or they could send a number of Rpcs over to the agent, store the functions in a Buffer, and only later call every function to wait for all the responses to complete and aggregate their results

Next, we need a way to batch our usage of callAgent, since we want all our RPCs (whether Rpc.Exists, Rpc.IsDir, Rpc.ReadBytes, or Rpc.WriteOver) to be pipelined. We can do this with a pipelineCalls function:

   def main(src0: String, dest0: String): Unit =
     ...
     def callAgent[T: upickle.Reader](rpc: Rpc): () => T = ...
-    for srcSubPath <- os.walk(src) do ...
+    val subPaths = os.walk(src).map(_.subRelativeTo(src))
+    def pipelineCalls[T: upickle.Reader](rpcFor: os.SubPath => Option[Rpc]) =
+      val buffer = collection.mutable.Buffer.empty[(os.RelPath, () => T)]
+      for p <- subPaths; rpc <- rpcFor(p) do buffer.addOne((p, callAgent[T](rpc)))
+      buffer.map((k, v) => (k, v())).toMap
17.16.scala

This function walks over the subPaths, and for each path it calls rpcFor to create an Rpc that it sends to the agent (rpcFor can return None indicating no message should be sent for a particular path). After all messages have been sent, it then calls the buffered () => T functions to wait for all of the responses, and collects them into a Map for convenience.

Note that once we send over a batch of Rpcs to the agent, it immediately starts performing the specified actions one after the other, and sending responses back to the host process as each action completes. Calling the returned () => T function is for the host process to wait until a particular Rpc has been processed and its response received, but the actual processing has already started happening asynchronously in the agent.

17.5.2 Batching Pipelined Filesystem Operations

We can now write our syncing logic in terms of pipelineCalls:

sync/src/Sync.scala   def main(src0: String, dest0: String): Unit =
     ...
     def pipelineCalls[T: upickle.Reader](rpcFor: os.SubPath => Option[Rpc]) = ...
+    val existsMap = pipelineCalls[Boolean](p => Some(Rpc.Exists(p)))
+    val isDirMap = pipelineCalls[Boolean](p => Some(Rpc.IsDir(p)))
+    val readMap = pipelineCalls[Array[Byte]]: p =>
+      if existsMap(p) && !isDirMap(p) then Some(Rpc.ReadBytes(p))
+      else None
+    pipelineCalls[Unit]: p =>
+      if os.isDir(src / p) then None
+      else
+        val localBytes = os.read.bytes(src / p)
+        if readMap.get(p).exists(java.util.Arrays.equals(_, localBytes)) then None
+        else Some(Rpc.WriteOver(localBytes, p))17.17.scala

Essentially, pipelineCalls lets us turn our individual path-by-path RPCs into batch streaming RPCs that operate on the entire folder tree, while only paying for a single network round trip. Rather than interleaving the calls to Exists, IsDir, ReadBytes and WriteOver, we now perform them all en-masse: first make all the Exists calls, then all the IsDir calls, ReadBytes calls, WriteOver calls.

The final pipelined version of Sync.scala is now as follows:

sync/src/Sync.scalapackage sync
object Sync:
  def main(src0: String, dest0: String): Unit =
    val src = os.Path(src0, os.pwd)
    val dest = os.Path(dest0, os.pwd)
    val agentExecutable = os.temp(os.read.bytes(os.resource / "agent.jar"))
    os.perms.set(agentExecutable, "rwx------")
    val agent = os.spawn(cmd = agentExecutable, cwd = dest)
    def callAgent[T: upickle.Reader](rpc: Rpc): () => T =
      Shared.send(agent.stdin.data, rpc)
      () => Shared.receive[T](agent.stdout.data)
    val subPaths = os.walk(src).map(_.subRelativeTo(src))
    def pipelineCalls[T: upickle.Reader](rpcFor: os.SubPath => Option[Rpc]) =
      val buffer = collection.mutable.Buffer.empty[(os.RelPath, () => T)]
      for p <- subPaths; rpc <- rpcFor(p) do buffer.addOne((p, callAgent[T](rpc)))
      buffer.map((k, v) => (k, v())).toMap
    val existsMap = pipelineCalls[Boolean](p => Some(Rpc.Exists(p)))
    val isDirMap = pipelineCalls[Boolean](p => Some(Rpc.IsDir(p)))
    val readMap = pipelineCalls[Array[Byte]]: p =>
      if existsMap(p) && !isDirMap(p) then Some(Rpc.ReadBytes(p))
      else None
    pipelineCalls[Unit]: p =>
      if os.isDir(src / p) then None
      else
        val localBytes = os.read.bytes(src / p)
        if readMap.get(p).exists(java.util.Arrays.equals(_, localBytes)) then None
        else Some(Rpc.WriteOver(localBytes, p))17.18.scala

While pipelining may incur additional overhead (e.g. CPU time to construct these Maps and memory to store them) it reduces our previously O(number-of-files) network round trips to just O(4). Perhaps not a big deal when syncing files locally, but much more significant if you're syncing files over a network and each round trip is 100s of milliseconds!

You can test this pipelined implementation the same way we tested the naive two-process implementation earlier. The final code for Sync.scala is as follows:

See example 17.3 - Pipelined

17.6 Conclusion

In this chapter, we have walked through how to build a simple multi-process Scala application, by taking the simple local file synchronizer we built earlier and turning it into a pipelined multi-process file synchronizer. To do so, we replaced direct calls to os.* functions with RPCs that send case class instances of type Rpc over to a remote agent process, serialized as length-prefixed MessagePack binaries, and the agent process handles each message and returns a response.

Finally, we re-architected the synchronization logic to pipeline it: rather than interleaving many calls to local and remote operations and waiting for each one to complete, we perform our remote filesystem operations en-masse in a streaming fashion. This involved sending multiple RPCs over to the agent without waiting for a response, and then only after all RPCs have been sent do we wait for them all and aggregate the responses.

This chapter should have given you a good intuition for how serialization of messages and remote procedure calls can be used to coordinate work in a multi-process Scala application. Data serialization, message passing, and RPCs of some kind are the foundation on which all distributed systems are built. While you may bump into a range of other libraries or frameworks in the wild, the general principles would be the same.

This file synchronizer is still relatively simple, with many limitations. One such limitation is that it is a batch program: we run it once, it processes and syncs all the files, and then terminates. Often, what a user wants is a real-time file synchronizer, that constantly keeps the local and remote filesystems in sync even as the local files keep changing. This is something we will re-visit in Chapter 18: Building a Real-time File Synchronizer.

Exercise: Modify the multi-process file synchronizer from this chapter to support deletions: any file or folder that is present in the destination folder and not present in the source folder should be deleted from the destination when the synchronizer is run.

See example 17.4 - Deletes

Exercise: In distributed multi-process applications the greatest bottleneck is often the latency and bandwidth of the network. Add Gzip compression to the data we are exchanging between Sync and Agent by wrapping the subprocess' input and output streams in java.util.zip.GZIPInputStream and GZIPOutputStream.

Make sure construct the GZIPOutputStream with syncFlush = true to avoid unwanted buffering, and that the GZIPInputStreams are only constructed once some data has been written to the other end of the stream. You may find Scala's lazy val syntax useful, as a convenient way to define vals whose evaluation is deferred until they are referenced.

See example 17.5 - Gzip

Exercise: Use ssh to run the Agent process on a separate computer, allowing the program to synchronize files over to a remote computer over the network.

You can use ssh <host> <cmd> to run a command on a remote host, and invoke that via os.call() or os.spawn() to do so from your Scala codebase. Note that you will need to install java on the remote host, copy the agent.jar over e.g. via scp, and any local commands like mkdir or os.perms.set will need to be run on the remote computer via ssh.

See example 17.6 - Ssh
Discuss Chapter 17 online at https://www.handsonscala.com/discuss/17