| 18.1 Watching for Changes | 343 |
| 18.2 Real-time Syncing with Actors | 344 |
| 18.3 Testing the Syncer | 350 |
| 18.4 Pipelined Real-time Syncing | 351 |
| 18.5 Testing the Pipelined Syncer | 354 |
object SyncActor extends castor.SimpleActor[Msg]:
def run(msg: Msg): Unit = msg match
case ChangedPath(value) => Shared.send(agent.stdin.data, Rpc.StatPath(value))
case AgentResponse(Rpc.StatInfo(p, remoteHash)) =>
val localHash = Shared.hashPath(src / p)
if localHash != remoteHash && localHash.isDefined then
Shared.send(agent.stdin.data, Rpc.WriteOver(os.read.bytes(src / p), p))
18.1.scala
Snippet 18.1: an actor used as part of our real-time file synchronizer
In this chapter, we will write a file synchronizer that can keep the destination folder up to date even as the source folder changes over time. This chapter serves as a capstone project, tying together concepts from Chapter 17: Multi-Process Applications and Chapter 16: Message-based Parallelism with Actors.
The techniques in this chapter form the basis for "event driven" architectures, which are common in many distributed systems. Real-time file synchronization is a difficult problem, and we will see how we can use the Scala language and libraries to approach it in an elegant and understandable way.
The file synchronizers we have seen in previous chapters were fundamentally
batch processes. They assume all the files on disk are static, read all files
in the source and destination folders, compare them, and decide which files they
need to sync over. Both the single-process and multi-process versions of our
file synchronizer worked the same way, except in the multi-process version we
had to make RPCs to the agent process to perform actions such as querying or
writing files to the destination folder.
This batch syncing works well for some usage patterns - such as one off or nightly filesystem backups - but less well for folders with many small changes you want to sync over quickly. In such a scenario, our batch syncer would repeatedly walk the entire folder and scan large numbers of unchanged files only to perform a small number of useful changes. This can greatly slow down the process of syncing one small change from the source folder to the destination.
The key to converting our batch file syncer into an efficient real-time version is the os.watch API:
os.watch.watch(roots: Seq[os.Path], onEvent: Set[os.Path] => Unit): Unit
os.watch allows you to register an onEvent listener on one or more root
folders: every time a file or folder is changed within one of those folders,
onEvent is called with the path to the changed file. This allows you to
efficiently detect changes within a folder, without needing to repeatedly scan
the contents of files and folders to see if anything changed.
os.watch uses FSEvents on macOS, and
relies on the JVM's builtin support for
inotify on Linux or WSL2. At time of
writing, it does not support watching for file changes on Windows.
You can import this API in the Scala CLI REPL to try it out, registering a watch and then performing filesystem actions to see the watch get triggered.
$ scala --import com.lihaoyi::os-lib-watch:0.11.5 --repl
> os.makeDir(os.pwd / "out")
> os.watch.watch(
Seq(os.pwd / "out"),
paths => println("paths changed: " + paths.mkString(", "))
)
> os.write(os.pwd / "out/i am", "cow")
paths changed: /Users/lihaoyi/test/out/i am
> os.move(os.pwd / "out"/ "i am", os.pwd / "out/hear me")
paths changed: /Users/lihaoyi/test/out/i am,/Users/lihaoyi/test/out/hear me
18.2.scala
Note that os.watch spawns a separate thread to watch the filesystem and call
the onEvent callback. Thus you need to make sure the onEvent callback is
safe to run in a multi-threaded environment.
At a high level, our real-time file synchronizer will be architected as follows:
First, we feed the results of os.watch's onEvent callback into
SyncActor. This is an actor that processes incoming messages one at a time
Next, SyncActor will send a StatPath message to the Agent to ask for
the hash for the file at a particular path
Agent will query the destination folder for the hash of that path, and
return it as a StatInfo message to SyncActor
SyncActor compares the source and destination hashes. If they differ,
SyncActor sends WriteOver messages to Agent, containing the contents of
files we wish to copy over
Rather than treating the file synchronizer as a one-off program that reads data, performs some action, and then exits, we are treating it as a pipeline: for every file change in the source folder, it will go through these steps. Thus it doesn't matter if the files continue changing while our sync is in progress, or if more files change after the sync is complete: we can feed those file changes through the same pipeline and bring the destination folder up to date.
The insight here is that the various files we are syncing are mostly
independent, but steps like sending messages to the Agent need to happen in a
single-threaded fashion, and updates to any particular file need to have their
order preserved. This makes it a good problem to model using Actors.
Note that the fact that SyncActor is an actor handling messages one after
another is important. SyncActor is receiving messages from two different
sources concurrently - both os.watch and Agent - while also executing its
own logic. Making it an actor ensures that these messages are processed
sequentially, letting us avoid thinking about race conditions, memory models and
other multi-threading-related issues.
First, let's define our build.mill file:
build.mill18.3.scala//| mill-version: 1.0.6importmill.*,scalalib.*traitSyncModuleextendsScalaModule:defscalaVersion="3.7.3"defmvnDeps=Seq(mvn"com.lihaoyi::upickle:4.4.2",mvn"com.lihaoyi::os-lib:0.11.6",mvn"com.lihaoyi::os-lib-watch:0.11.5",mvn"com.lihaoyi::castor:0.3.0")objectsyncextendsSyncModule:defmoduleDeps=Seq(shared)defresources=Task{os.copy(agent.assembly().path,Task.dest/"agent.jar")super.resources()++Seq(PathRef(Task.dest))}objecttestextendsScalaTestswithTestModule.Utest:defmvnDeps=Seq(mvn"com.lihaoyi::utest:0.9.4")objectagentextendsSyncModule:defmoduleDeps=Seq(shared)objectsharedextendsSyncModule
This is the configuration used by the Mill build tool to compile our code. It
defines three modules: sync, agent, and shared. The configuration is
mostly similar to that used in Chapter 17: Multi-Process Applications, except
for the addition of two external dependencies:
com.lihaoyi::os-lib-watch:0.11.5: this provides the filesystem-watching
functionality we will use to detect changes in the source folder we need to
sync to the destination
com.lihaoyi::castor:0.3.0: this is a small actor library, introduced in
Chapter 16: Message-based Parallelism with Actors, that we will use to set
up our actor pipelines
Next, we need to download the Mill launcher:
$ REPO=https://repo1.maven.org/maven2/com/lihaoyi/mill-dist/1.0.6
$ curl -L "$REPO/mill-dist-1.0.6-mill.sh" -o mill
$ chmod +x mill
18.4.bash
You can then use ./mill --bsp-install to generate a config that can be imported into either IntelliJ or VS Code, echo "1.0.6" > .mill-version to fix the Mill version, ./mill __.compile to compile everything, or ./mill show sync.assembly to create an
executable that you can run. This is similar to what we saw in Chapter 17: Multi-Process Applications.
Next, let us define the shared code and messages that will be passed between the
Agent and the SyncActor over the wire:
shared/src/Rpc.scala18.5.scalapackagesyncimportupickle.{ReadWriter,readwriter}givensubPathRw:ReadWriter[os.SubPath]=readwriter[String].bimap[os.SubPath](_.toString,os.SubPath(_))enumRpcderivesReadWriter:caseStatPath(path:os.SubPath)caseWriteOver(src:Array[Byte],path:os.SubPath)objectRpc:caseclassStatInfo(p:os.SubPath,fileHash:Option[Int])derivesReadWriter
The main things to note are:
enum Rpc, which represents all messages that can be sent from
SyncActor to Agent. There are two: case class StatPath asking for the
metadata of a path in the destination folder, and case class WriteOver
telling it to place a file's contents at a particular path
case class StatInfo, which represents the metadata about a particular path
that the Agent will return to the SyncActor in response to the StatPath
command. For now, it just contains the hash of the file contents at that path
(if any) and the path itself
given subPathRw, which we use to make os.SubPaths serializable.
This is similar to what we did in Chapter 17: Multi-Process Applications
We re-use the same wire protocol, and send and receive methods, that we used
earlier in Chapter 17: Multi-Process Applications:
shared/src/Shared.scala18.6.scalapackagesyncimportupickle.{Reader,Writer}objectShared:defsend[T:Writer](out:java.io.DataOutputStream,msg:T):Unit=valbytes=upickle.writeBinary(msg)out.writeInt(bytes.length)out.write(bytes)out.flush()defreceive[T:Reader](in:java.io.DataInputStream)=valbuf=newArray[Byte](in.readInt())in.readFully(buf)upickle.readBinary[T](buf)defhashPath(p:os.Path)=if!os.isFile(p)thenNoneelseSome(java.util.Arrays.hashCode(os.read.bytes(p)))
This implements the same wire protocol we saw earlier: we send messages by first
sending the length of the message as a 4-byte integer, followed by the
messagepack-serialized binary blob that uPickle converts the message classes
into. We also define a shared hashPath method, which hashes the contents of a
file at a particular path, if one exists. This is used in both Agent and
SyncActor, since we need to compare the hash of the file in source and
destination folders to decide if it needs to be copied.
Next, we will define our Agent. This will run on the destination side of our
file synchronizer (possibly across the network), receive commands from the
sync process and respond with results:
agent/src/Agent.scala18.7.scalapackagesyncobjectAgent:@maindefrun():Unit=valinput=java.io.DataInputStream(System.in)valoutput=java.io.DataOutputStream(System.out)whiletruedotryvalrpc=Shared.receive[Rpc](input)System.err.println("Agent handling: "+rpc)rpcmatchcaseRpc.StatPath(path)=>Shared.send(output,Rpc.StatInfo(path,Shared.hashPath(os.pwd/path)))caseRpc.WriteOver(bytes,path)=>os.remove.all(os.pwd/path)os.write.over(os.pwd/path,bytes,createFolders=true)catchcasee:java.io.EOFException=>System.exit(0)
Our Agent is somewhat simpler than that in Chapter 17, due to the smaller
set of Rpc messages it needs to handle. Our Rpc.StatPath message is enough
to replace the trio of Rpc.IsDir, Rpc.Exists and Rpc.ReadBytes that we
were using before
We use System.err.println to add some logging of what the Agent is doing. The
Agent subprocess inherits the stderr of the host process by default, making it
an ideal channel to send debug logging or other diagnostics without interfering
with the main data exchange happening over System.in and System.out.
Lastly, we will define the main entry-point of our program, Sync.scala.
sync/src/Sync.scala18.8.scalapackagesyncobjectSync:defmain(src0:String,dest0:String):Unit=val(src,dest)=(os.Path(src0,os.pwd),os.Path(dest0,os.pwd))valagentExecutable=os.temp(os.read.bytes(os.resource/"agent.jar"))os.perms.set(agentExecutable,"rwx------")valagent=os.spawn(cmd=agentExecutable,cwd=dest)
We define a simple main method, parse the first two command-line args into
src and dest paths, and spawn the agent. This is again similar to what we
did in Chapter 17: Multi-Process Applications. Note that by default .spawn
forwards the standard error stream of the subprocess directly to the host
process' standard error, so the System.err.println logging output in
Agent.scala will show up in the console when Sync.scala is run.
Before we define the implementation of SyncActor, let us consider the messages
the actor will send and receive. As stated earlier, it will receive
ChangedPath messages from the os.watch thread, and Rpc.StatInfo responses
from the Agent.
sync/src/Sync.scala18.9.scaladefmain(src0:String,dest0:String):Unit=...+enumMsg+caseChangedPath(value:os.SubPath)+caseAgentResponse(value:Rpc.StatInfo)
We wrap the Rpc.StatInfos in an AgentResponse class to make it part of our
Msg hierarchy.
SyncActor is a castor.SimpleActor[Msg]. For now, we only have one
SyncActor, so we make it a singleton object for simplicity. SyncActor is
the only thing writing to agent.stdin, which is important because its
single-threaded nature ensures messages are written to agent.stdin one after
another and not all jumbled together.
sync/src/Sync.scala18.10.scala+importcastorContext..Simple.global+objectSyncActorextendscastor.SimpleActor[Msg]:+defrun(msg:Msg):Unit=+println("SyncActor handling: "+msg)+msgmatch+caseChangedPath(value)=>Shared.send(agent.stdin.data,Rpc.StatPath(value))+caseAgentResponse(Rpc.StatInfo(p,remoteHash))=>+vallocalHash=Shared.hashPath(src/p)+iflocalHash!=remoteHash&&localHash.isDefinedthen+Shared.send(agent.stdin.data,Rpc.WriteOver(os.read.bytes(src/p),p))
ChangedPath messages are forwarded directly to the agent as StatPath
commands, while the Rpc.StatInfo responses are then used to decide whether or
not a file needs to be synced. If the hashes of the source and destination files
differ, and the file exists locally, we sync it using an Rpc.WriteOver message
containing the bytes of the file.
The last section of Sync.scala involves two threads we are spinning up:
sync/src/Sync.scala18.11.scala+valagentReader=Thread(()=>+whileagent.isAlive()do+SyncActor.send(AgentResponse(Shared.receive[Rpc.StatInfo](agent.stdout.data)))+)+agentReader.start()+valwatcher=os.watch.watch(+Seq(src),+onEvent=_.foreach(p=>SyncActor.send(ChangedPath(p.subRelativeTo(src))))+)+Thread.sleep(Long.MaxValue)
agentReader reads StatInfo responses from agent.stdout, and sends them
to SyncActorwatcher reads file changes from the filesystem, and sends the ChangedPaths
to SyncActorBoth of these threads are running concurrently. However, because each thread's
input is only read by that thread, and each thread sends its output to an actor,
we can be confident that we won't have any multithreading issues here. After
starting the two threads, we make the main method Thread.sleep while the
syncer runs in the background so the program does not terminate prematurely.
The complete implementation of Sync.scala is as follows:
sync/src/Sync.scala18.12.scalapackagesyncobjectSync:defmain(src0:String,dest0:String):Unit=val(src,dest)=(os.Path(src0,os.pwd),os.Path(dest0,os.pwd))valagentExecutable=os.temp(os.read.bytes(os.resource/"agent.jar"))os.perms.set(agentExecutable,"rwx------")valagent=os.spawn(cmd=agentExecutable,cwd=dest)enumMsg:caseChangedPath(value:os.SubPath)caseAgentResponse(value:Rpc.StatInfo)importcastorContext..Simple.globalobjectSyncActorextendscastor.SimpleActor[Msg]:defrun(msg:Msg):Unit=println("SyncActor handling: "+msg)msgmatchcaseMsg.ChangedPath(value)=>Shared.send(agent.stdin.data,Rpc.StatPath(value))caseMsg.AgentResponse(Rpc.StatInfo(p,remoteHash))=>vallocalHash=Shared.hashPath(src/p)iflocalHash!=remoteHash&&localHash.isDefinedthenShared.send(agent.stdin.data,Rpc.WriteOver(os.read.bytes(src/p),p))valagentReader=Thread(()=>whileagent.isAlive()doSyncActor.send(Msg.AgentResponse(Shared.receive[Rpc.StatInfo](agent.stdout.data))))agentReader.start()valwatcher=os.watch.watch(Seq(src),onEvent=_.foreach(p=>SyncActor.send(Msg.ChangedPath(p.subRelativeTo(src)))))Thread.sleep(Long.MaxValue)
We can test out the syncer we have written so far by making two folders, running
the sync.assembly executable on them, and then in another terminal creating a
bunch of files and folders in the src/ folder to verify that they appear in
dest/:
|
First Terminal |
Second Terminal |
| |
As you can see, we can create files and folders while the syncer is running, and
they are immediately synced over. The one-file-at-a-time implementation means
that we only need to sync the files that have changed, and do not need to waste
time on files that have not: this is a big advantage for interactive use cases
where you are syncing lots of small changes. In our original terminal where the
syncer is running, we can see the logging output from SyncActor as it
processes each file and folder we created:
SyncActor handling: ChangedPath(hello)
Agent handling: StatPath(hello)
SyncActor handling: AgentResponse(StatInfo(hello,None))
SyncActor handling: ChangedPath(hello/world.txt)
Agent handling: StatPath(hello/world.txt)
SyncActor handling: AgentResponse(StatInfo(hello/world.txt,None))
Agent handling: WriteOver([B@7995092a,hello/world.txt)
SyncActor handling: ChangedPath(moo)
Agent handling: StatPath(moo)
SyncActor handling: AgentResponse(StatInfo(moo,None))
Agent handling: WriteOver([B@76908cc0,moo)
18.15.outputIf we continue making changes in the source folder, our program will pick them
up via os.watch and synchronize them over to the destination. Note that we do
not need to handle the case where a file changes in the middle of being synced
over: such a file change would result in another event being sent to os.watch,
which would be processed after the current message processing is complete. Even
if the SyncActor logic misbehaves if a file changes underneath it, we can
expect a future ChangedPath message to arrive and give SyncActor a chance to
sync the file over correctly.
We now have a simple real-time file synchronizer which can continuously sync
changes from the source folder to the destination. We use the single-threaded
nature of our SyncActor to help manage our concurrent system with data coming
in from both filesystem events as well as agent responses.
However, the fact that SyncActor processes messages one at a time in a
single-threaded fashion is both a blessing and a curse: this means that the
actions of SyncActor cannot happen in parallel. We will now look at breaking
up SyncActor into two smaller actors, in order to get some pipeline
parallelism in between different stages in the file sync pipeline.
SyncActor has two main tasks that are taking up all of its time:
The two Shared.send calls, which send commands and data over to the agent
process
The Shared.hashPath call, which hashes the bytes of a local file, compares
it to the destination hash we received from agent, and decides whether to
copy the file contents over
While these two operations are currently running in the same SyncActor, they
could be pipelined: sending commands over the network shouldn't get in the way
of hashing and comparing local files, and vice versa. Luckily this sort of
pipelining is exactly what Actors are good at!
Thus, we will introduce another actor in the pipeline, HashActor:
HashActor will hash and compare local files with the remote file hash, so that
SyncActor only needs to send messages over the network. The two actors can
then run concurrently.
We can do this by modifying the SyncActor and agentReader we defined earlier
as follows:
sync/src/Sync.scala18.16.scalaenumMsgcaseChangedPath(value:os.SubPath)-caseAgentResponse(value:Rpc.StatInfo)+caseHashStatInfo(localHash:Option[Int],value:Rpc.StatInfo)importcastorContext..Simple.globalobjectSyncActorextendscastor.SimpleActor[Msg]:defrun(msg:Msg):Unit=println("SyncActor handling: "+msg)msgmatchcaseChangedPath(value)=>Shared.send(agent.stdin.data,Rpc.StatPath(value))-caseAgentResponse(Rpc.StatInfo(p,remoteHash))=>-vallocalHash=Shared.hashPath(src/p)+caseHashStatInfo(localHash,Rpc.StatInfo(p,remoteHash))=>iflocalHash!=remoteHash&&localHash.isDefinedthenShared.send(agent.stdin.data,Rpc.WriteOver(os.read.bytes(src/p),p))
sync/src/Sync.scala18.17.scala+objectHashActorextendscastor.SimpleActor[Rpc.StatInfo]:+defrun(msg:Rpc.StatInfo):Unit=+println("HashActor handling: "+msg)+vallocalHash=Shared.hashPath(src/msg.p)+SyncActor.send(HashStatInfo(localHash,msg))
sync/src/Sync.scala18.18.scalavalagentReader=Thread(()=>whileagent.isAlive()do-SyncActor.send(AgentResponse(Shared.receive[Rpc.StatInfo](agent.stdout.data)))+HashActor.send(Shared.receive[Rpc.StatInfo](agent.stdout.data)))
Here, we rename AgentResponse as HashStatInfo, and give it a localHash: Option[Int] field. In agentReader, instead of sending the StatInfo directly
to SyncActor, we send it to our new HashActor. HashActor computes the
localHash before bundling that together with the original StatInfo in a
HashStatInfo to send to SyncActor.
Thus, we now have the slow hashPath call being run in a separate
castor.SimpleActor from the slow Shared.send calls. That ensures that they
can run in parallel, giving us a nice pipeline parallelism while the use of
castor.SimpleActor helps us avoid needing to think about race conditions or
other multi-threading related issues.
We can test this by clearing out the src/ and dest/ folders we populated
earlier and repeating the manual test (18.3). We should again
see that any files we create in src/ are quickly synced over to dest/. The
complete implementation of the pipelined version of Sync.scala is presented
below. The other files build.mill, Agent.scala, Shared.scala, and
Rpc.scala are unchanged from the non-pipelined implementation we saw earlier:
sync/src/Sync.scala18.19.scalapackagesyncobjectSync:defmain(src0:String,dest0:String):Unit=val(src,dest)=(os.Path(src0,os.pwd),os.Path(dest0,os.pwd))valagentExecutable=os.temp(os.read.bytes(os.resource/"agent.jar"))os.perms.set(agentExecutable,"rwx------")valagent=os.spawn(cmd=agentExecutable,cwd=dest)enumMsg:caseChangedPath(value:os.SubPath)caseHashStatInfo(localHash:Option[Int],value:Rpc.StatInfo)importcastorContext..Simple.globalobjectSyncActorextendscastor.SimpleActor[Msg]:defrun(msg:Msg):Unit=println("SyncActor handling: "+msg)msgmatchcaseMsg.ChangedPath(value)=>Shared.send(agent.stdin.data,Rpc.StatPath(value))caseMsg.HashStatInfo(localHash,Rpc.StatInfo(p,remoteHash))=>iflocalHash!=remoteHash&&localHash.isDefinedthenShared.send(agent.stdin.data,Rpc.WriteOver(os.read.bytes(src/p),p))objectHashActorextendscastor.SimpleActor[Rpc.StatInfo]:defrun(msg:Rpc.StatInfo):Unit=println("HashActor handling: "+msg)vallocalHash=Shared.hashPath(src/msg.p)SyncActor.send(Msg.HashStatInfo(localHash,msg))valagentReader=Thread(()=>whileagent.isAlive()doHashActor.send(Shared.receive[Rpc.StatInfo](agent.stdout.data)))agentReader.start()valwatcher=os.watch.watch(Seq(src),onEvent=_.foreach(p=>SyncActor.send(Msg.ChangedPath(p.subRelativeTo(src)))))Thread.sleep(Long.MaxValue)
We can test this pipelined implementation the same way we tested our first
actor-based file synchronizer: creating an assembly, creating two empty
folders src and dest, and starting the sync. If we create files and folders
in src/ in quick succession, e.g. in the same shell command, we will see that
the the logging from SyncActor, Agent, and HashActor become interleaved as
they all run in parallel:
$ mkdir src/hello; echo "Hello World" > src/hello/world.txt; echo "I am Cow" > src/moo
SyncActor handling: ChangedPath(hello)
SyncActor handling: ChangedPath(hello/world.txt)
SyncActor handling: ChangedPath(moo)
Agent handling: StatPath(hello)
Agent handling: StatPath(hello/world.txt)
Agent handling: StatPath(moo)
HashActor handling: StatInfo(hello,None)
SyncActor handling: HashStatInfo(None,StatInfo(hello,None))
HashActor handling: StatInfo(hello/world.txt,None)
HashActor handling: StatInfo(moo,None)
SyncActor handling: HashStatInfo(Some(-1262502777),StatInfo(hello/world.txt,None))
SyncActor handling: HashStatInfo(Some(-15566917),StatInfo(moo,None))
Agent handling: WriteOver([B@70be0a2b,hello/world.txt)
Agent handling: WriteOver([B@49b0b76,moo)
18.20.output
While these small tests do not really benefit from the pipeline parallelism our two actors give us, if you create a large number of files quickly we would see that the pipelined file syncer runs significantly faster than our earlier single-actor implementation.
As a convenient way of creating a large number of files quickly, try going into
the src/ folder and cloning a small Git repository such as
https://github.com/com-lihaoyi/fansi. This will create dozens of files as the
repository is downloaded. Our file synchronizer will begin working immediately
as the first file is created, and once complete we should be able to run git show or git log dest/fansi/ folder and have it print the same output as if
we ran them in src/fansi/:
$ cd src
$ git clone https://github.com/com-lihaoyi/fansi
Cloning into 'fansi'...
$ cd ../dest/fansi
$ git show
commit 21a91b22366aa2761eb0b284049aa2a0eec8e565 (HEAD -> master, origin/master)
Merge: ad7aa6b a723c95
Author: Li Haoyi <haoyi.sg@gmail.com>
Date: Mon Mar 2 06:49:15 2020 +0800
Merge pull request #23 from lolgab/add-scala-native
Add support for Scala Native 0.3 and 0.4.0-M2
18.21.bash
This demonstrates that even though we are creating files on disk while the file synchronizer is in progress, it is able to keep up with the ongoing changes and eventually bring the destination folder up to date with the contents of the source folder.
In this chapter, we took the batch-style file synchronizer we wrote in Chapter 17: Multi-Process Applications and turned it into a real-time file synchronizer that can incrementally synchronize two folders. We are now processing only the files that have changed, and do not need to redundantly scan files that have not.
In order to manage the constant stream of messages we are getting from both
os.watch and our agent subprocess, we used a castor.SimpleActor to ensure
that the messages are processed in a single-threaded fashion. We then split our
monolithic SyncActor into two: SyncActor and HashActor, allowing the
process of hashing local files and sending commands to the agent to happen in
parallel.
This file syncing program is still relatively simple: we do not handle deletions, we buffer file contents entirely in-memory, and we do not perform any sub-file-level syncing of large files. In addition, the code as-written only performs incremental syncing, and does not synchronize files that were present when the application starts. Adding these additional features is left as an exercise to the reader.
Hopefully this concise example has given you a feel for how an actor's single-threaded nature and the ability to chain them together into parallel pipelines are useful in solving real-world problems in an elegant way. In larger event-driven systems, your messages may be stored in some kind of shared queue service, and instead of actors you may have entire servers, but the same principles apply. This event-driven approach can be used in all sorts of scenarios where we have multiple processes we need to coordinate in a highly concurrent environment.
Exercise: Modify the pipelined file synchronizer to make it support synchronizing folders which start off with files and subfolders that need to be copied over.
See example 18.3 - InitialFilesExercise: Modify the pipelined file synchronizer to make it hash local files in
parallel, using Futures. Note that although different files can be hashed
and processed independently, we need to ensure that events related to each
individual file are processed in order.
Exercise: Modify the pipelined file synchronizer to support deletes: if a user deletes a file from the source folder, the syncer should delete the corresponding file from the destination folder.
See example 18.5 - DeletesExercise: Assuming the file synchronizer is the only process writing to the destination
folder, we should not need to ask agent what's on disk: the only files in the
destination are the files that SyncActor put there. Modify the pipelined
file synchronizer to keep track of the hashes of synchronized files locally,
so we can avoid exchanging StatPath and StatInfo messages for files the
syncer sent over itself.