| 17.1 Two-Process Build Setup | 327 |
| 17.2 Remote Procedure Calls | 330 |
| 17.3 The Agent Process | 331 |
| 17.4 The Sync Process | 332 |
| 17.5 Pipelined Syncing | 335 |
def send[T: Writer](out: DataOutputStream, msg: T): Unit =
val bytes = upickle.writeBinary(msg)
out.writeInt(bytes.length)
out.write(bytes)
out.flush()
def receive[T: Reader](in: DataInputStream) =
val buf = new Array[Byte](in.readInt())
in.readFully(buf)
upickle.readBinary[T](buf)
17.1.scala
Snippet 17.1: RPC send and receive methods for sending data over an operating system pipe or network
While all our programs so far have run within a single process, in real world scenarios you will be working as part of a larger system, and the application itself may need to be split into multiple processes. This chapter will walk you through how to do so: configuring your build tool to support multiple Scala processes, sharing code and exchanging serialized messages. These are the building blocks that form the foundation of any distributed system.
As this chapter's project, we will be building a simple multi-process file synchronizer that can work over a network. This chapter builds upon the simple single-process file synchronizer in Chapter 7: Files and Subprocesses, and will form the basis for Chapter 18: Building a Real-time File Synchronizer.
In Chapter 7: Files and Subprocesses, we learned how to work with the
filesystem using Scala, and wrote a simple function that synchronizes the
contents of two folders by comparing the files on each side and copying files as
necessary from one to the other. However, there was one big limitation of the
def sync function we wrote: it can only work if both src and dest folders
are on the same computer! This is because it relies on having direct access to
read and write to the files in both src and dest:
This of course rules out possibly the most common use case of file syncing: synchronizing the contents of a folder between two computers across a network. To allow for remote file syncing, we will need a slightly different approach:
A sync process with direct access to the src/ folder will spawn an agent
process with direct access to the dest folder
When sync needs to query or perform some action on the dest/ folder, it
will send a message to agent via the agent's standard input stream, and
the agent will reply via its standard output stream
For the purposes of this exercise we will be running both sync and agent on
the same computer, with the two processes connected by an operating system pipe:
While the snippets in this chapter run both processes on the same computer, it is trivial to extend this to a remote scenario by e.g. spawning the child process over an SSH connection:
We will set up this project using the Mill build tool. You can install Mill in your current project as follows:
$ REPO=https://repo1.maven.org/maven2/com/lihaoyi/mill-dist/1.0.6
$ curl -L "$REPO/mill-dist-1.0.6-mill.sh" -o mill
$ chmod +x mill
17.2.bash
We will start with the following build.mill file:
build.mill17.3.scala//%drop-lines 12,13importmill.*,scalalib.*traitSyncModuleextendsScalaModule:defscalaVersion="3.7.3"defmvnDeps=Seq(mvn"com.lihaoyi::upickle:4.4.2",mvn"com.lihaoyi::os-lib:0.11.6")objectsharedextendsSyncModuleobjectsyncextendsSyncModule:defmvnDeps=Seq(mvn"com.lihaoyi::utest:0.9.4")objectagentextendsSyncModule:defmoduleDeps=Seq(shared)
This build file defines two modules - sync and agent - each with their
respective sync/src/ and agent/src/ folders. Both modules share the same
Scala version and dependencies on the
uPickle and
OS-Lib libraries, as well as a dependency
on a shared module with its corresponding shared/src/ folder. You will need
to manually create the sync/src/, agent/src/ and shared/src/ folders.
$ mkdir -p agent/src sync/src shared/src
You can then run $ ./mill --bsp-install to load the project into
your IDE:

First let us take the Scala CLI REPL snippet we wrote in Chapter 7: Files and Subprocesses and integrate it into our Mill project. That involves taking the
original def sync function and converting it into a proper def main(args: Array[String]) program entrypoint that takes its arguments from the command
line:
sync/src/Sync.scala17.4.scala-defsync(src:os.Path,dest:os.Path)=+packagesync+objectSync:+defmain(src0:String,dest0:String):Unit=+valsrc=os.Path(src0,os.pwd)+valdest=os.Path(dest0,os.pwd)forsrcSubPath<-os.walk(src)dovalsubPath=srcSubPath.subRelativeTo(src)valdestSubPath=dest/subPath(os.isDir(srcSubPath),os.isDir(destSubPath))matchcase(false,true)|(true,false)=>os.copy.over(srcSubPath,destSubPath)case(false,false)if!os.exists(destSubPath)||!os.read.bytes(srcSubPath).sameElements(os.read.bytes(destSubPath))=>os.copy.over(srcSubPath,destSubPath,createFolders=true)case_=>// do nothing
We can then run the following script to package the compiled code into a
self-contained executable, often called an assembly. Once packaged, we can run
the executable manually to sync the files:
$ ./mill show sync.assembly
"ref:v0:1845e552:/Users/lihaoyi/test/out/sync/assembly.dest/out.jar"
$ find sync -type f
sync/src/Sync.scala
$ mkdir test
$ out/sync/assembly.dest/out.jar sync test
$ find test -type f
test/src/Sync.scala
17.5.bashThe next step is to take our various os.* calls in the sync process on the
dest folder, and convert them into remote procedure calls (RPCs) that
communicate with an agent process that can respond with the results. The calls
we need to care about are:
os.isDir(destSubPath)os.exists(destSubPath)os.read.bytes(destSubPath)os.copy.over(srcSubPath, destSubPath, createFolders = true)To convert these into remote procedure calls, we need to define the messages that our two processes will exchange, and the protocol which the two processes will use to exchange them.
We will put these messages in shared/src/Rpc.scala, since they will be used
by both sync and agent processes:
shared/src/Rpc.scala17.6.scalapackagesyncimportupickle.{readwriter,ReadWriter}givensubPathRw:ReadWriter[os.SubPath]=readwriter[String].bimap[os.SubPath](_.toString,os.SubPath(_))enumRpcderivesReadWriter:caseIsDir(path:os.SubPath)caseExists(path:os.SubPath)caseReadBytes(path:os.SubPath)caseWriteOver(src:Array[Byte],path:os.SubPath)
Here we are defining Rpc as a enum, because we know upfront that
there are only a fixed number of messages we will want to pass between the
sync and agent processes. For each case class, we also add derives ReadWriter,
which defines a given ReadWriter to allow serialization to JSON or messagepack binaries,
as we saw earlier in Chapter 8: JSON and Binary Data Serialization. We will need this
in order to send them from one process to the other.
Note we also need to define a subPathRw, in order to handle the os.SubPaths
that are part of the case classes we are serializing. Since os.SubPath can
be easily converted to and from Strings, we make use of bimap in order to
convert our ReadWriter[String] into a ReadWriter[os.SubPath] for us to use.
Apart from the messages themselves, we need a standard way of sending and
receiving Rpc messages between the two processes. Operating system processes
communicate via "pipes", which are exposed to a Scala application as a
java.io.InputStream you can read from and a java.io.OutputStream you can
write to. These are streams of bytes, on top of which we'll need to define a
protocol for sending our case class messages.
As our protocol for sending messages over these byte streams, we will send each
message in a length-prefixed fashion: every message will be preceded by a single
4-byte Int that tells the agent how long that message is, allowing it to
read the correct number of bytes to pass into upickle.readBinary to
deserialize into an Rpc object:
We assume that anyone receiving data over a stream of bytes will receive a constant stream of these messages, each one preceded by a 4-byte header telling it how many bytes that message contains. This can be implemented as follows:
shared/src/Shared.scala17.7.scalapackagesyncimportupickle.{Reader,Writer}objectShared:defsend[T:Writer](out:java.io.DataOutputStream,msg:T):Unit=valbytes=upickle.writeBinary(msg)out.writeInt(bytes.length)out.write(bytes)out.flush()defreceive[T:Reader](in:java.io.DataInputStream)=valbuf=newArray[Byte](in.readInt())in.readFully(buf)upickle.readBinary[T](buf)
This protocol for sending messages via DataOutputStreams and receiving
messages via DataInputStreams is implemented by the send and receive
methods above. We use DataInputStreams and DataOutputStreams as they provide
some conveniences e.g. for reading/writing 4-byte integers. We put this logic in
shared/ because both the sync and agent processes will need to send
messages to one another.
Now that we've defined the case classes that will represent our RPCs, we can
now implement our agent process. At a high level, the agent process does the
following steps:
Rpc from the sync processRpc wants it to do: read some bytes, over-write
a file, etc.sync processWe need some way to exchange data. The sync process that spawns agent will
have access to agent's System.in and System.out streams, so we can use
those two streams to exchange data between the two processes:
agent/src/Agent.scala17.8.scalapackagesyncobjectAgent:@maindefrun():Unit=valinput=java.io.DataInputStream(System.in)valoutput=java.io.DataOutputStream(System.out)whiletruedotryvalrpc=Shared.receive[Rpc](input)catchcasee:java.io.EOFException=>System.exit(0)
Above, we wrap System.in and System.out in a DataInputStream and
DataOutputStream respectively, to be compatible with the send/receive
methods we defined earlier. Inside a loop, we call receive to read a single
Rpc worth of bytes from sync and deserialize it into an Rpc object.
Lastly, we match on the rpc, and do what it tells us to do, and write the
result back to sync using Shared.send:
agent/src/Agent.scala17.9.scalawhiletruedotryvalrpc=Shared.receive[Rpc](input)+rpcmatch+caseRpc.IsDir(path)=>Shared.send(output,os.isDir(os.pwd/path))+caseRpc.Exists(path)=>Shared.send(output,os.exists(os.pwd/path))+caseRpc.ReadBytes(path)=>Shared.send(output,os.read.bytes(os.pwd/path))+caseRpc.WriteOver(bytes,path)=>+os.remove.all(os.pwd/path)+Shared.send(output,os.write.over(os.pwd/path,bytes,createFolders=true))catchcasee:java.io.EOFException=>System.exit(0)
For now, we assume that the agent process is spawned with its os.pwd set to
the destination folder we want to sync files to.
The try-catch block around the body of the while loop makes sure that when
the sync process terminates, the agent process exits cleanly without a stack
trace. Our two-process file synchronizer can still work without the
try-catch, but will print an ugly stack trace to the console when syncing is
complete and the Agent shuts down.
The agent process is intentionally kept simple, running operations on behalf
of the sync process that it is not able to run itself. The agent only knows
how to handle one operation at a time, report the result, and nothing else: the
complexity will live in the sync process, which we will cover next.
First, we need to give sync access to the agent executable. We can do
this by adding agent.assembly to sync's resources in build.mill:
build.mill17.10.scalaobjectsyncextendsSyncModule:defmoduleDeps=Seq(shared)+defresources=Task{+os.copy(agent.assembly().path,Task.dest/"agent.jar")+super.resources()++Seq(PathRef(Task.dest))+}
This ensures that whenever the sync module is packaged and run, the agent's
assembly executable is prepared and ready to be loaded for use as agent.jar.
We can then use os.read.bytes in sync's Scala code to load the binary
contents of agent.jar, write it to a file, and execute it via os.spawn
to create a child process we can communicate with:
sync/src/Sync.scala17.11.scalavalsrc=os.Path(args(0),os.pwd)valdest=os.Path(args(1),os.pwd)+valagentExecutable=os.temp(os.read.bytes(os.resource/"agent.jar"))+os.perms.set(agentExecutable,"rwx------")+valagent=os.spawn(cmd=agentExecutable,cwd=dest)forsrcSubPath<-os.walk(src)dovalsubPath=srcSubPath.subRelativeTo(src)valdestSubPath=dest/subPath
This gives us a val agent that represents the running agent subprocess,
which is a os.SubProcess object like those in Chapter 7: Files and Subprocesses. Note that we need to write the agent.jar to a temporary file
before executing it, as on the JVM packaged resource files are zipped into the
enclosing sync executable and cannot be executed directly.
To make it more convenient to exchange messages with agent, we define a
callAgent function that writes a message to agent using send,
and reads the agent's response using receive:
sync/src/Sync.scala17.12.scalavalagent=os.spawn(cmd=agentExecutable,cwd=dest)+defcallAgent[T:upickle.Reader](rpc:Rpc):T=+Shared.send(agent.stdin.data,rpc)+Shared.receive[T](agent.stdout.data)forsrcSubPath<-os.walk(src)do
callAgent is parameterized on the type T that we expect in response to the
Rpc we send, and T must be a type with a pre-defined
upickle.Reader.
Next, we need to update the filesystem walk-compare-copy logic to change all
os.* calls that operate on the dest path to instead use callAgent to ask
the agent process to do the work for us:
sync/src/Sync.scala17.13.scalavalsubPath=srcSubPath.subRelativeTo(src)valdestSubPath=dest/subPath-(os.isDir(srcSubPath),os.isDir(destSubPath))match+(os.isDir(srcSubPath),callAgent[Boolean](Rpc.IsDir(subPath)))match-case(false,true)|(true,false)=>os.copy.over(srcSubPath,destSubPath)+case(false,true)=>+callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath),subPath))+case(true,false)=>+forp<-os.walk(srcSubPath)ifos.isFile(p)do+callAgent[Unit](Rpc.WriteOver(os.read.bytes(p),p.subRelativeTo(src)))case(false,false)-if!os.exists(destSubPath)-||!os.read.bytes(srcSubPath).sameElements(os.read.bytes(destSubPath))=>+if!callAgent[Boolean](Rpc.Exists(subPath))+||!os.read.bytes(srcSubPath).sameElements(+callAgent[Array[Byte]](Rpc.ReadBytes(subPath))+)=>-os.copy.over(srcSubPath,destSubPath,createFolders=true)+callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath),subPath))case_=>// do nothing
This is a relatively mechanical change, except for the (true, false) case:
this is the case where a local path is a folder but the remote path is not.
Previously we were relying on os.copy.over to recursively copy the source
folder over the destination, but now we have to manually send individual
WriteOver commands, one for each file in the source folder, to make sure they
all get copied to the destination.
The working Sync.scala code we have written now looks like this:
sync/src/Sync.scala17.14.scalapackagesyncobjectSync:defmain(src0:String,dest0:String):Unit=valsrc=os.Path(src0,os.pwd)valdest=os.Path(dest0,os.pwd)valagentExecutable=os.temp(os.read.bytes(os.resource/"agent.jar"))os.perms.set(agentExecutable,"rwx------")valagent=os.spawn(cmd=agentExecutable,cwd=dest)defcallAgent[T:upickle.Reader](rpc:Rpc):T=Shared.send(agent.stdin.data,rpc)Shared.receive[T](agent.stdout.data)forsrcSubPath<-os.walk(src)dovalsubPath=srcSubPath.subRelativeTo(src)valdestSubPath=dest/subPath(os.isDir(srcSubPath),callAgent[Boolean](Rpc.IsDir(subPath)))matchcase(false,true)=>callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath),subPath))case(true,false)=>forp<-os.walk(srcSubPath)ifos.isFile(p)docallAgent[Unit](Rpc.WriteOver(os.read.bytes(p),p.subRelativeTo(src)))case(false,false)if!callAgent[Boolean](Rpc.Exists(subPath))||!os.read.bytes(srcSubPath).sameElements(callAgent[Array[Byte]](Rpc.ReadBytes(subPath)))=>callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath),subPath))case_=>// do nothing
We can test this new multi-process application with both agent and sync, as
we saw earlier in Testing our File Syncer (17.1.2), to verify
that it works similarly to the earlier one-process version. The big difference
is that now the only point of contact between agent and sync is the
agent's stdin and stdout.
This split makes it possible to take the two processes and have them run on
different computers: e.g. sync may run on your laptop, and agent may run on
a cloud server you are trying to sync files to. As long as you can provide a
stdin/stdout interface (e.g. by running the agent over
SSH) you would now be able to sync
files across the network.
While we now have a simple two-process network-compatible file synchronizer, it behaves roughly the same way as our original one-process file synchronizer: for each file in the source folder, it reads the file on the destination folder, compares the two, and copies the source file over the destination if necessary. While this works, it is an exceedingly "chatty" protocol: the syncer needs to make multiple network round trips in order to sync each file:
The round trips of the "file exists locally but not remotely" case can be visualized as follows:
This is repeated over and over for every file in the source folder.
Network round trips are slow, and a fast file syncer must aim to minimize the number of trips we require. The technique we need is Pipelining: rather than waiting for every RPC to complete before proceeding with further work, we instead send our RPCs en-masse without waiting, and then aggregate all the responses as they are streamed back. This can allow O(n) RPCs to be made at the cost of only a single round trip's worth of network latency.
The first major change is to tweak the definition of callAgent:
sync/src/Sync.scala17.15.scala-defcallAgent[T:upickle.Reader](rpc:Rpc):T=+defcallAgent[T:upickle.Reader](rpc:Rpc):()=>T=Shared.send(agent.stdin.data,rpc)-Shared.receive[T](agent.stdout.data)+()=>Shared.receive[T](agent.stdout.data)
Instead of immediately reading a response to deserialize and return as T, we
now delay reading and instead return a zero-argument function of type () => T
that you can call to read and deserialize the response later. This lets the
caller of callAgent decide when they want to call it:
Rpcs over to the agent, store the functions
in a Buffer, and only later call every function to wait for all the
responses to complete and aggregate their resultsNext, we need a way to batch our usage of callAgent, since we want all our
RPCs (whether Rpc.Exists, Rpc.IsDir, Rpc.ReadBytes, or Rpc.WriteOver) to
be pipelined. We can do this with a pipelineCalls function:
def main(src0: String, dest0: String): Unit =
...
def callAgent[T: upickle.Reader](rpc: Rpc): () => T = ...
- for srcSubPath <- os.walk(src) do ...
+ val subPaths = os.walk(src).map(_.subRelativeTo(src))
+ def pipelineCalls[T: upickle.Reader](rpcFor: os.SubPath => Option[Rpc]) =
+ val buffer = collection.mutable.Buffer.empty[(os.RelPath, () => T)]
+ for p <- subPaths; rpc <- rpcFor(p) do buffer.addOne((p, callAgent[T](rpc)))
+ buffer.map((k, v) => (k, v())).toMap
17.16.scala
This function walks over the subPaths, and for each path it calls rpcFor to
create an Rpc that it sends to the agent (rpcFor can return None
indicating no message should be sent for a particular path). After all messages
have been sent, it then calls the buffered () => T functions to wait for all
of the responses, and collects them into a Map for convenience.
Note that once we send over a batch of Rpcs to the agent, it immediately
starts performing the specified actions one after the other, and sending
responses back to the host process as each action completes. Calling the
returned () => T function is for the host process to wait until a particular
Rpc has been processed and its response received, but the actual processing
has already started happening asynchronously in the agent.
We can now write our syncing logic in terms of pipelineCalls:
sync/src/Sync.scala17.17.scaladefmain(src0:String,dest0:String):Unit=...defpipelineCalls[T:upickle.Reader](rpcFor:os.SubPath=>Option[Rpc])=...+valexistsMap=pipelineCalls[Boolean](p=>Some(Rpc.Exists(p)))+valisDirMap=pipelineCalls[Boolean](p=>Some(Rpc.IsDir(p)))+valreadMap=pipelineCalls[Array[Byte]]:p=>+ifexistsMap(p)&&!isDirMap(p)thenSome(Rpc.ReadBytes(p))+elseNone+pipelineCalls[Unit]:p=>+ifos.isDir(src/p)thenNone+else+vallocalBytes=os.read.bytes(src/p)+ifreadMap.get(p).exists(java.util.Arrays.equals(_,localBytes))thenNone+elseSome(Rpc.WriteOver(localBytes,p))
Essentially, pipelineCalls lets us turn our individual path-by-path RPCs into
batch streaming RPCs that operate on the entire folder tree, while only paying
for a single network round trip. Rather than interleaving the calls to Exists,
IsDir, ReadBytes and WriteOver, we now perform them all en-masse: first
make all the Exists calls, then all the IsDir calls, ReadBytes calls,
WriteOver calls.
The final pipelined version of Sync.scala is now as follows:
sync/src/Sync.scala17.18.scalapackagesyncobjectSync:defmain(src0:String,dest0:String):Unit=valsrc=os.Path(src0,os.pwd)valdest=os.Path(dest0,os.pwd)valagentExecutable=os.temp(os.read.bytes(os.resource/"agent.jar"))os.perms.set(agentExecutable,"rwx------")valagent=os.spawn(cmd=agentExecutable,cwd=dest)defcallAgent[T:upickle.Reader](rpc:Rpc):()=>T=Shared.send(agent.stdin.data,rpc)()=>Shared.receive[T](agent.stdout.data)valsubPaths=os.walk(src).map(_.subRelativeTo(src))defpipelineCalls[T:upickle.Reader](rpcFor:os.SubPath=>Option[Rpc])=valbuffer=collection.mutable.Buffer.empty[(os.RelPath,()=>T)]forp<-subPaths;rpc<-rpcFor(p)dobuffer.addOne((p,callAgent[T](rpc)))buffer.map((k,v)=>(k,v())).toMapvalexistsMap=pipelineCalls[Boolean](p=>Some(Rpc.Exists(p)))valisDirMap=pipelineCalls[Boolean](p=>Some(Rpc.IsDir(p)))valreadMap=pipelineCalls[Array[Byte]]:p=>ifexistsMap(p)&&!isDirMap(p)thenSome(Rpc.ReadBytes(p))elseNone pipelineCalls[Unit]:p=>ifos.isDir(src/p)thenNoneelsevallocalBytes=os.read.bytes(src/p)ifreadMap.get(p).exists(java.util.Arrays.equals(_,localBytes))thenNoneelseSome(Rpc.WriteOver(localBytes,p))
While pipelining may incur additional overhead (e.g. CPU time to construct these
Maps and memory to store them) it reduces our previously O(number-of-files)
network round trips to just O(4). Perhaps not a big deal when syncing files
locally, but much more significant if you're syncing files over a network and
each round trip is 100s of milliseconds!
You can test this pipelined implementation the same way we tested the naive
two-process implementation earlier. The final code for Sync.scala is as follows:
In this chapter, we have walked through how to build a simple multi-process
Scala application, by taking the simple local file synchronizer we built earlier
and turning it into a pipelined multi-process file synchronizer. To do so, we
replaced direct calls to os.* functions with RPCs that send case class
instances of type Rpc over to a remote agent process, serialized as
length-prefixed MessagePack binaries, and the agent process handles each message
and returns a response.
Finally, we re-architected the synchronization logic to pipeline it: rather than interleaving many calls to local and remote operations and waiting for each one to complete, we perform our remote filesystem operations en-masse in a streaming fashion. This involved sending multiple RPCs over to the agent without waiting for a response, and then only after all RPCs have been sent do we wait for them all and aggregate the responses.
This chapter should have given you a good intuition for how serialization of messages and remote procedure calls can be used to coordinate work in a multi-process Scala application. Data serialization, message passing, and RPCs of some kind are the foundation on which all distributed systems are built. While you may bump into a range of other libraries or frameworks in the wild, the general principles would be the same.
This file synchronizer is still relatively simple, with many limitations. One such limitation is that it is a batch program: we run it once, it processes and syncs all the files, and then terminates. Often, what a user wants is a real-time file synchronizer, that constantly keeps the local and remote filesystems in sync even as the local files keep changing. This is something we will re-visit in Chapter 18: Building a Real-time File Synchronizer.
Exercise: Modify the multi-process file synchronizer from this chapter to support deletions: any file or folder that is present in the destination folder and not present in the source folder should be deleted from the destination when the synchronizer is run.
See example 17.4 - DeletesExercise: In distributed multi-process applications the greatest bottleneck is often the
latency and bandwidth of the network. Add Gzip compression to the data we are
exchanging between Sync and Agent by wrapping the subprocess' input and
output streams in java.util.zip.GZIPInputStream and GZIPOutputStream.
Make sure construct the GZIPOutputStream with syncFlush = true to avoid
unwanted buffering, and that the GZIPInputStreams are only constructed once
some data has been written to the other end of the stream. You may find
Scala's lazy val syntax useful, as a convenient way to define vals whose
evaluation is deferred until they are referenced.
Exercise: Use ssh to run the Agent process on a separate computer, allowing the
program to synchronize files over to a remote computer over the network.
You can use ssh <host> <cmd> to run a command on a remote host, and invoke
that via os.call() or os.spawn() to do so from your Scala
codebase. Note that you will need to install java on the remote host, copy
the agent.jar over e.g. via scp, and any local commands like mkdir or
os.perms.set will need to be run on the remote computer via ssh.