18

Building a Real-time File Synchronizer


18.1 Watching for Changes343
18.2 Real-time Syncing with Actors344
18.3 Testing the Syncer350
18.4 Pipelined Real-time Syncing351
18.5 Testing the Pipelined Syncer354

object SyncActor extends castor.SimpleActor[Msg]:
  def run(msg: Msg): Unit = msg match
    case ChangedPath(value) => Shared.send(agent.stdin.data, Rpc.StatPath(value))
    case AgentResponse(Rpc.StatInfo(p, remoteHash)) =>
      val localHash = Shared.hashPath(src / p)
      if localHash != remoteHash && localHash.isDefined then
        Shared.send(agent.stdin.data, Rpc.WriteOver(os.read.bytes(src / p), p))
18.1.scala

Snippet 18.1: an actor used as part of our real-time file synchronizer

In this chapter, we will write a file synchronizer that can keep the destination folder up to date even as the source folder changes over time. This chapter serves as a capstone project, tying together concepts from Chapter 17: Multi-Process Applications and Chapter 16: Message-based Parallelism with Actors.

The techniques in this chapter form the basis for "event driven" architectures, which are common in many distributed systems. Real-time file synchronization is a difficult problem, and we will see how we can use the Scala language and libraries to approach it in an elegant and understandable way.

The file synchronizers we have seen in previous chapters were fundamentally batch processes. They assume all the files on disk are static, read all files in the source and destination folders, compare them, and decide which files they need to sync over. Both the single-process and multi-process versions of our file synchronizer worked the same way, except in the multi-process version we had to make RPCs to the agent process to perform actions such as querying or writing files to the destination folder.

This batch syncing works well for some usage patterns - such as one off or nightly filesystem backups - but less well for folders with many small changes you want to sync over quickly. In such a scenario, our batch syncer would repeatedly walk the entire folder and scan large numbers of unchanged files only to perform a small number of useful changes. This can greatly slow down the process of syncing one small change from the source folder to the destination.

18.1 Watching for Changes

The key to converting our batch file syncer into an efficient real-time version is the os.watch API:

os.watch.watch(roots: Seq[os.Path], onEvent: Set[os.Path] => Unit): Unit

os.watch allows you to register an onEvent listener on one or more root folders: every time a file or folder is changed within one of those folders, onEvent is called with the path to the changed file. This allows you to efficiently detect changes within a folder, without needing to repeatedly scan the contents of files and folders to see if anything changed.

os.watch uses FSEvents on macOS, and relies on the JVM's builtin support for inotify on Linux or WSL2. At time of writing, it does not support watching for file changes on Windows.

You can import this API in the Scala CLI REPL to try it out, registering a watch and then performing filesystem actions to see the watch get triggered.

$ scala --import com.lihaoyi::os-lib-watch:0.11.5 --repl

> os.makeDir(os.pwd / "out")

> os.watch.watch(
    Seq(os.pwd / "out"),
    paths => println("paths changed: " + paths.mkString(", "))
  )

> os.write(os.pwd / "out/i am", "cow")
paths changed: /Users/lihaoyi/test/out/i am

> os.move(os.pwd / "out"/ "i am", os.pwd / "out/hear me")
paths changed: /Users/lihaoyi/test/out/i am,/Users/lihaoyi/test/out/hear me
18.2.scala

Note that os.watch spawns a separate thread to watch the filesystem and call the onEvent callback. Thus you need to make sure the onEvent callback is safe to run in a multi-threaded environment.

18.2 Real-time Syncing with Actors

18.2.1 Architecture

At a high level, our real-time file synchronizer will be architected as follows:

G cluster_0 Main Process cluster_1 Sub Process os.watch os.watch SyncActor SyncActor os.watch->SyncActor 1. ChangedPath Agent Agent SyncActor->Agent 2. StatPath SyncActor->Agent 3. StatInfo SyncActor->Agent 4. WriteOver
  1. First, we feed the results of os.watch's onEvent callback into SyncActor. This is an actor that processes incoming messages one at a time

  2. Next, SyncActor will send a StatPath message to the Agent to ask for the hash for the file at a particular path

  3. Agent will query the destination folder for the hash of that path, and return it as a StatInfo message to SyncActor

  4. SyncActor compares the source and destination hashes. If they differ, SyncActor sends WriteOver messages to Agent, containing the contents of files we wish to copy over

Rather than treating the file synchronizer as a one-off program that reads data, performs some action, and then exits, we are treating it as a pipeline: for every file change in the source folder, it will go through these steps. Thus it doesn't matter if the files continue changing while our sync is in progress, or if more files change after the sync is complete: we can feed those file changes through the same pipeline and bring the destination folder up to date.

The insight here is that the various files we are syncing are mostly independent, but steps like sending messages to the Agent need to happen in a single-threaded fashion, and updates to any particular file need to have their order preserved. This makes it a good problem to model using Actors.

Note that the fact that SyncActor is an actor handling messages one after another is important. SyncActor is receiving messages from two different sources concurrently - both os.watch and Agent - while also executing its own logic. Making it an actor ensures that these messages are processed sequentially, letting us avoid thinking about race conditions, memory models and other multi-threading-related issues.

18.2.2 Build Configuration

First, let's define our build.mill file:

build.mill//| mill-version: 1.0.6
import mill.*, scalalib.*
trait SyncModule extends ScalaModule:
  def scalaVersion = "3.7.3"
  def mvnDeps = Seq(
    mvn"com.lihaoyi::upickle:4.4.2",
    mvn"com.lihaoyi::os-lib:0.11.6",
    mvn"com.lihaoyi::os-lib-watch:0.11.5",
    mvn"com.lihaoyi::castor:0.3.0"
  )
object sync extends SyncModule:
  def moduleDeps = Seq(shared)
  def resources = Task{
    os.copy(agent.assembly().path, Task.dest / "agent.jar")
    super.resources() ++ Seq(PathRef(Task.dest))
  }
  object test extends ScalaTests with TestModule.Utest:
    def mvnDeps = Seq(mvn"com.lihaoyi::utest:0.9.4")
object agent extends SyncModule:
  def moduleDeps = Seq(shared)
object shared extends SyncModule18.3.scala

This is the configuration used by the Mill build tool to compile our code. It defines three modules: sync, agent, and shared. The configuration is mostly similar to that used in Chapter 17: Multi-Process Applications, except for the addition of two external dependencies:

  • com.lihaoyi::os-lib-watch:0.11.5: this provides the filesystem-watching functionality we will use to detect changes in the source folder we need to sync to the destination

  • com.lihaoyi::castor:0.3.0: this is a small actor library, introduced in Chapter 16: Message-based Parallelism with Actors, that we will use to set up our actor pipelines

Next, we need to download the Mill launcher:

$ REPO=https://repo1.maven.org/maven2/com/lihaoyi/mill-dist/1.0.6

$ curl -L "$REPO/mill-dist-1.0.6-mill.sh" -o mill

$ chmod +x mill
18.4.bash

You can then use ./mill --bsp-install to generate a config that can be imported into either IntelliJ or VS Code, echo "1.0.6" > .mill-version to fix the Mill version, ./mill __.compile to compile everything, or ./mill show sync.assembly to create an executable that you can run. This is similar to what we saw in Chapter 17: Multi-Process Applications.

18.2.3 Shared Code

Next, let us define the shared code and messages that will be passed between the Agent and the SyncActor over the wire:

shared/src/Rpc.scalapackage sync
import upickle.{ReadWriter, readwriter}

given subPathRw: ReadWriter[os.SubPath] =
  readwriter[String].bimap[os.SubPath](_.toString, os.SubPath(_))

enum Rpc derives ReadWriter:
  case StatPath(path: os.SubPath)
  case WriteOver(src: Array[Byte], path: os.SubPath)

object Rpc:
  case class StatInfo(p: os.SubPath, fileHash: Option[Int]) derives ReadWriter18.5.scala

The main things to note are:

  • enum Rpc, which represents all messages that can be sent from SyncActor to Agent. There are two: case class StatPath asking for the metadata of a path in the destination folder, and case class WriteOver telling it to place a file's contents at a particular path

  • case class StatInfo, which represents the metadata about a particular path that the Agent will return to the SyncActor in response to the StatPath command. For now, it just contains the hash of the file contents at that path (if any) and the path itself

  • given subPathRw, which we use to make os.SubPaths serializable. This is similar to what we did in Chapter 17: Multi-Process Applications

We re-use the same wire protocol, and send and receive methods, that we used earlier in Chapter 17: Multi-Process Applications:

shared/src/Shared.scalapackage sync
import upickle.{Reader, Writer}
object Shared:
  def send[T: Writer](out: java.io.DataOutputStream, msg: T): Unit =
    val bytes = upickle.writeBinary(msg)
    out.writeInt(bytes.length)
    out.write(bytes)
    out.flush()
  def receive[T: Reader](in: java.io.DataInputStream) =
    val buf = new Array[Byte](in.readInt())
    in.readFully(buf)
    upickle.readBinary[T](buf)
  def hashPath(p: os.Path) =
    if !os.isFile(p) then None
    else Some(java.util.Arrays.hashCode(os.read.bytes(p)))18.6.scala

This implements the same wire protocol we saw earlier: we send messages by first sending the length of the message as a 4-byte integer, followed by the messagepack-serialized binary blob that uPickle converts the message classes into. We also define a shared hashPath method, which hashes the contents of a file at a particular path, if one exists. This is used in both Agent and SyncActor, since we need to compare the hash of the file in source and destination folders to decide if it needs to be copied.

18.2.4 Agent

Next, we will define our Agent. This will run on the destination side of our file synchronizer (possibly across the network), receive commands from the sync process and respond with results:

agent/src/Agent.scalapackage sync
object Agent:
  @main def run(): Unit =
    val input = java.io.DataInputStream(System.in)
    val output = java.io.DataOutputStream(System.out)
    while true do try
      val rpc = Shared.receive[Rpc](input)
      System.err.println("Agent handling: " + rpc)
      rpc match
        case Rpc.StatPath(path) =>
          Shared.send(output, Rpc.StatInfo(path, Shared.hashPath(os.pwd / path)))

        case Rpc.WriteOver(bytes, path) =>
          os.remove.all(os.pwd / path)
          os.write.over(os.pwd / path, bytes, createFolders = true)
    catch case e: java.io.EOFException => System.exit(0)18.7.scala

Our Agent is somewhat simpler than that in Chapter 17, due to the smaller set of Rpc messages it needs to handle. Our Rpc.StatPath message is enough to replace the trio of Rpc.IsDir, Rpc.Exists and Rpc.ReadBytes that we were using before

We use System.err.println to add some logging of what the Agent is doing. The Agent subprocess inherits the stderr of the host process by default, making it an ideal channel to send debug logging or other diagnostics without interfering with the main data exchange happening over System.in and System.out.

18.2.5 Sync

Lastly, we will define the main entry-point of our program, Sync.scala.

18.2.5.1 Initializing the Agent

sync/src/Sync.scalapackage sync
object Sync:
  def main(src0: String, dest0: String): Unit =
    val (src, dest) = (os.Path(src0, os.pwd), os.Path(dest0, os.pwd))
    val agentExecutable = os.temp(os.read.bytes(os.resource / "agent.jar"))
    os.perms.set(agentExecutable, "rwx------")
    val agent = os.spawn(cmd = agentExecutable, cwd = dest)18.8.scala

We define a simple main method, parse the first two command-line args into src and dest paths, and spawn the agent. This is again similar to what we did in Chapter 17: Multi-Process Applications. Note that by default .spawn forwards the standard error stream of the subprocess directly to the host process' standard error, so the System.err.println logging output in Agent.scala will show up in the console when Sync.scala is run.

18.2.5.2 SyncActor Messages

Before we define the implementation of SyncActor, let us consider the messages the actor will send and receive. As stated earlier, it will receive ChangedPath messages from the os.watch thread, and Rpc.StatInfo responses from the Agent.

sync/src/Sync.scala   def main(src0: String, dest0: String): Unit =
     ...
+    enum Msg
+      case ChangedPath(value: os.SubPath)
+      case AgentResponse(value: Rpc.StatInfo)18.9.scala

We wrap the Rpc.StatInfos in an AgentResponse class to make it part of our Msg hierarchy.

18.2.5.3 SyncActor

SyncActor is a castor.SimpleActor[Msg]. For now, we only have one SyncActor, so we make it a singleton object for simplicity. SyncActor is the only thing writing to agent.stdin, which is important because its single-threaded nature ensures messages are written to agent.stdin one after another and not all jumbled together.

sync/src/Sync.scala+    import castor.Context.Simple.global
+    object SyncActor extends castor.SimpleActor[Msg]:
+      def run(msg: Msg): Unit =
+        println("SyncActor handling: " + msg)
+        msg match
+          case ChangedPath(value) => Shared.send(agent.stdin.data, Rpc.StatPath(value))
+          case AgentResponse(Rpc.StatInfo(p, remoteHash)) =>
+            val localHash = Shared.hashPath(src / p)
+            if localHash != remoteHash && localHash.isDefined then
+              Shared.send(agent.stdin.data, Rpc.WriteOver(os.read.bytes(src / p), p))18.10.scala

ChangedPath messages are forwarded directly to the agent as StatPath commands, while the Rpc.StatInfo responses are then used to decide whether or not a file needs to be synced. If the hashes of the source and destination files differ, and the file exists locally, we sync it using an Rpc.WriteOver message containing the bytes of the file.

18.2.5.4 agentReader and os.watch

The last section of Sync.scala involves two threads we are spinning up:

sync/src/Sync.scala+    val agentReader = Thread(() =>
+      while agent.isAlive() do
+        SyncActor.send(AgentResponse(Shared.receive[Rpc.StatInfo](agent.stdout.data)))
+    )
+    agentReader.start()
+    val watcher = os.watch.watch(
+      Seq(src),
+      onEvent = _.foreach(p => SyncActor.send(ChangedPath(p.subRelativeTo(src))))
+    )
+    Thread.sleep(Long.MaxValue)18.11.scala
  • agentReader reads StatInfo responses from agent.stdout, and sends them to SyncActor
  • watcher reads file changes from the filesystem, and sends the ChangedPaths to SyncActor

Both of these threads are running concurrently. However, because each thread's input is only read by that thread, and each thread sends its output to an actor, we can be confident that we won't have any multithreading issues here. After starting the two threads, we make the main method Thread.sleep while the syncer runs in the background so the program does not terminate prematurely.

The complete implementation of Sync.scala is as follows:

sync/src/Sync.scalapackage sync
object Sync:
  def main(src0: String, dest0: String): Unit =
    val (src, dest) = (os.Path(src0, os.pwd), os.Path(dest0, os.pwd))
    val agentExecutable = os.temp(os.read.bytes(os.resource / "agent.jar"))
    os.perms.set(agentExecutable, "rwx------")
    val agent = os.spawn(cmd = agentExecutable, cwd = dest)
    enum Msg:
      case ChangedPath(value: os.SubPath)
      case AgentResponse(value: Rpc.StatInfo)
    import castor.Context.Simple.global
    object SyncActor extends castor.SimpleActor[Msg]:
      def run(msg: Msg): Unit =
        println("SyncActor handling: " + msg)
        msg match
          case Msg.ChangedPath(value) => Shared.send(agent.stdin.data, Rpc.StatPath(value))
          case Msg.AgentResponse(Rpc.StatInfo(p, remoteHash)) =>
            val localHash = Shared.hashPath(src / p)
            if localHash != remoteHash && localHash.isDefined then
              Shared.send(agent.stdin.data, Rpc.WriteOver(os.read.bytes(src / p), p))
    val agentReader = Thread(() =>
      while agent.isAlive() do
        SyncActor.send(Msg.AgentResponse(Shared.receive[Rpc.StatInfo](agent.stdout.data)))
    )

    agentReader.start()
    val watcher = os.watch.watch(
      Seq(src),
      onEvent = _.foreach(p => SyncActor.send(Msg.ChangedPath(p.subRelativeTo(src))))
    )
    Thread.sleep(Long.MaxValue)18.12.scala

18.3 Testing the Syncer

We can test out the syncer we have written so far by making two folders, running the sync.assembly executable on them, and then in another terminal creating a bunch of files and folders in the src/ folder to verify that they appear in dest/:

First Terminal

Second Terminal

$ ./mill show sync.assembly
...
"ref:v0:...:out/sync/assembly.dest/out.jar"

$ mkdir src dest

$ out/sync/assembly.dest/out.jar src dest
18.13.bash
$ mkdir src/hello

$ echo "Hello World" > src/hello/world.txt

$ echo "I am Cow" > src/moo

$ find src -type f
src/moo
src/hello/world.txt

$ find dest -type f
dest/moo
dest/hello/world.txt

$ cat dest/hello/world.txt
Hello World

$ cat dest/moo
I am Cow
18.14.bash

As you can see, we can create files and folders while the syncer is running, and they are immediately synced over. The one-file-at-a-time implementation means that we only need to sync the files that have changed, and do not need to waste time on files that have not: this is a big advantage for interactive use cases where you are syncing lots of small changes. In our original terminal where the syncer is running, we can see the logging output from SyncActor as it processes each file and folder we created:

SyncActor handling: ChangedPath(hello)
Agent handling: StatPath(hello)
SyncActor handling: AgentResponse(StatInfo(hello,None))

SyncActor handling: ChangedPath(hello/world.txt)
Agent handling: StatPath(hello/world.txt)
SyncActor handling: AgentResponse(StatInfo(hello/world.txt,None))
Agent handling: WriteOver([B@7995092a,hello/world.txt)

SyncActor handling: ChangedPath(moo)
Agent handling: StatPath(moo)
SyncActor handling: AgentResponse(StatInfo(moo,None))
Agent handling: WriteOver([B@76908cc0,moo)
18.15.output
See example 18.1 - Simple

If we continue making changes in the source folder, our program will pick them up via os.watch and synchronize them over to the destination. Note that we do not need to handle the case where a file changes in the middle of being synced over: such a file change would result in another event being sent to os.watch, which would be processed after the current message processing is complete. Even if the SyncActor logic misbehaves if a file changes underneath it, we can expect a future ChangedPath message to arrive and give SyncActor a chance to sync the file over correctly.

18.4 Pipelined Real-time Syncing

We now have a simple real-time file synchronizer which can continuously sync changes from the source folder to the destination. We use the single-threaded nature of our SyncActor to help manage our concurrent system with data coming in from both filesystem events as well as agent responses.

However, the fact that SyncActor processes messages one at a time in a single-threaded fashion is both a blessing and a curse: this means that the actions of SyncActor cannot happen in parallel. We will now look at breaking up SyncActor into two smaller actors, in order to get some pipeline parallelism in between different stages in the file sync pipeline.

18.4.1 Pipelined Architecture

SyncActor has two main tasks that are taking up all of its time:

  1. The two Shared.send calls, which send commands and data over to the agent process

  2. The Shared.hashPath call, which hashes the bytes of a local file, compares it to the destination hash we received from agent, and decides whether to copy the file contents over

While these two operations are currently running in the same SyncActor, they could be pipelined: sending commands over the network shouldn't get in the way of hashing and comparing local files, and vice versa. Luckily this sort of pipelining is exactly what Actors are good at!

Thus, we will introduce another actor in the pipeline, HashActor:

G cluster_0 Main Process cluster_1 Sub Process os.watch os.watch SyncActor SyncActor os.watch->SyncActor 1. ChangedPath Agent Agent SyncActor->Agent 2. StatPath SyncActor->Agent 5. WriteOver HashActor HashActor HashActor->SyncActor 4. StatAndHashInfo                               Agent->HashActor 3. StatInfo

HashActor will hash and compare local files with the remote file hash, so that SyncActor only needs to send messages over the network. The two actors can then run concurrently.

18.4.2 Pipelined Implementation

We can do this by modifying the SyncActor and agentReader we defined earlier as follows:

sync/src/Sync.scala     enum Msg
     case ChangedPath(value: os.SubPath)
-      case AgentResponse(value: Rpc.StatInfo)
+      case HashStatInfo(localHash: Option[Int], value: Rpc.StatInfo)
     import castor.Context.Simple.global
     object SyncActor extends castor.SimpleActor[Msg]:
       def run(msg: Msg): Unit =
         println("SyncActor handling: " + msg)
         msg match
           case ChangedPath(value) => Shared.send(agent.stdin.data, Rpc.StatPath(value))
-          case AgentResponse(Rpc.StatInfo(p, remoteHash)) =>
-            val localHash = Shared.hashPath(src / p)
+          case HashStatInfo(localHash, Rpc.StatInfo(p, remoteHash)) =>
             if localHash != remoteHash && localHash.isDefined then
               Shared.send(agent.stdin.data, Rpc.WriteOver(os.read.bytes(src / p), p))18.16.scala
sync/src/Sync.scala+    object HashActor extends castor.SimpleActor[Rpc.StatInfo]:
+      def run(msg: Rpc.StatInfo): Unit =
+        println("HashActor handling: " + msg)
+        val localHash = Shared.hashPath(src / msg.p)
+        SyncActor.send(HashStatInfo(localHash, msg))18.17.scala
sync/src/Sync.scala     val agentReader = Thread(() =>
       while agent.isAlive() do
-        SyncActor.send(AgentResponse(Shared.receive[Rpc.StatInfo](agent.stdout.data)))
+        HashActor.send(Shared.receive[Rpc.StatInfo](agent.stdout.data))
     )18.18.scala

Here, we rename AgentResponse as HashStatInfo, and give it a localHash: Option[Int] field. In agentReader, instead of sending the StatInfo directly to SyncActor, we send it to our new HashActor. HashActor computes the localHash before bundling that together with the original StatInfo in a HashStatInfo to send to SyncActor.

Thus, we now have the slow hashPath call being run in a separate castor.SimpleActor from the slow Shared.send calls. That ensures that they can run in parallel, giving us a nice pipeline parallelism while the use of castor.SimpleActor helps us avoid needing to think about race conditions or other multi-threading related issues.

We can test this by clearing out the src/ and dest/ folders we populated earlier and repeating the manual test (18.3). We should again see that any files we create in src/ are quickly synced over to dest/. The complete implementation of the pipelined version of Sync.scala is presented below. The other files build.mill, Agent.scala, Shared.scala, and Rpc.scala are unchanged from the non-pipelined implementation we saw earlier:

sync/src/Sync.scalapackage sync
object Sync:
  def main(src0: String, dest0: String): Unit =
    val (src, dest) = (os.Path(src0, os.pwd), os.Path(dest0, os.pwd))
    val agentExecutable = os.temp(os.read.bytes(os.resource / "agent.jar"))
    os.perms.set(agentExecutable, "rwx------")
    val agent = os.spawn(cmd = agentExecutable, cwd = dest)
    enum Msg:
      case ChangedPath(value: os.SubPath)
      case HashStatInfo(localHash: Option[Int], value: Rpc.StatInfo)

    import castor.Context.Simple.global
    object SyncActor extends castor.SimpleActor[Msg]:
      def run(msg: Msg): Unit =
        println("SyncActor handling: " + msg)
        msg match
          case Msg.ChangedPath(value) => Shared.send(agent.stdin.data, Rpc.StatPath(value))
          case Msg.HashStatInfo(localHash, Rpc.StatInfo(p, remoteHash)) =>
            if localHash != remoteHash && localHash.isDefined then
              Shared.send(agent.stdin.data, Rpc.WriteOver(os.read.bytes(src / p), p))
    object HashActor extends castor.SimpleActor[Rpc.StatInfo]:
      def run(msg: Rpc.StatInfo): Unit =
        println("HashActor handling: " + msg)
        val localHash = Shared.hashPath(src / msg.p)
        SyncActor.send(Msg.HashStatInfo(localHash, msg))
    val agentReader = Thread(() =>
      while agent.isAlive() do
        HashActor.send(Shared.receive[Rpc.StatInfo](agent.stdout.data))
    )
    agentReader.start()
    val watcher = os.watch.watch(
      Seq(src),
      onEvent = _.foreach(p => SyncActor.send(Msg.ChangedPath(p.subRelativeTo(src))))
    )
    Thread.sleep(Long.MaxValue)18.19.scala
See example 18.2 - Pipelined

18.5 Testing the Pipelined Syncer

We can test this pipelined implementation the same way we tested our first actor-based file synchronizer: creating an assembly, creating two empty folders src and dest, and starting the sync. If we create files and folders in src/ in quick succession, e.g. in the same shell command, we will see that the the logging from SyncActor, Agent, and HashActor become interleaved as they all run in parallel:

$ mkdir src/hello; echo "Hello World" > src/hello/world.txt; echo "I am Cow" > src/moo
SyncActor handling: ChangedPath(hello)
SyncActor handling: ChangedPath(hello/world.txt)
SyncActor handling: ChangedPath(moo)
Agent handling: StatPath(hello)
Agent handling: StatPath(hello/world.txt)
Agent handling: StatPath(moo)
HashActor handling: StatInfo(hello,None)
SyncActor handling: HashStatInfo(None,StatInfo(hello,None))
HashActor handling: StatInfo(hello/world.txt,None)
HashActor handling: StatInfo(moo,None)
SyncActor handling: HashStatInfo(Some(-1262502777),StatInfo(hello/world.txt,None))
SyncActor handling: HashStatInfo(Some(-15566917),StatInfo(moo,None))
Agent handling: WriteOver([B@70be0a2b,hello/world.txt)
Agent handling: WriteOver([B@49b0b76,moo)
18.20.output

While these small tests do not really benefit from the pipeline parallelism our two actors give us, if you create a large number of files quickly we would see that the pipelined file syncer runs significantly faster than our earlier single-actor implementation.

As a convenient way of creating a large number of files quickly, try going into the src/ folder and cloning a small Git repository such as https://github.com/com-lihaoyi/fansi. This will create dozens of files as the repository is downloaded. Our file synchronizer will begin working immediately as the first file is created, and once complete we should be able to run git show or git log dest/fansi/ folder and have it print the same output as if we ran them in src/fansi/:

$ cd src

$ git clone https://github.com/com-lihaoyi/fansi
Cloning into 'fansi'...

$ cd ../dest/fansi

$ git show
commit 21a91b22366aa2761eb0b284049aa2a0eec8e565 (HEAD -> master, origin/master)
Merge: ad7aa6b a723c95
Author: Li Haoyi <haoyi.sg@gmail.com>
Date:   Mon Mar 2 06:49:15 2020 +0800

    Merge pull request #23 from lolgab/add-scala-native

    Add support for Scala Native 0.3 and 0.4.0-M2
18.21.bash

This demonstrates that even though we are creating files on disk while the file synchronizer is in progress, it is able to keep up with the ongoing changes and eventually bring the destination folder up to date with the contents of the source folder.

18.6 Conclusion

In this chapter, we took the batch-style file synchronizer we wrote in Chapter 17: Multi-Process Applications and turned it into a real-time file synchronizer that can incrementally synchronize two folders. We are now processing only the files that have changed, and do not need to redundantly scan files that have not.

In order to manage the constant stream of messages we are getting from both os.watch and our agent subprocess, we used a castor.SimpleActor to ensure that the messages are processed in a single-threaded fashion. We then split our monolithic SyncActor into two: SyncActor and HashActor, allowing the process of hashing local files and sending commands to the agent to happen in parallel.

This file syncing program is still relatively simple: we do not handle deletions, we buffer file contents entirely in-memory, and we do not perform any sub-file-level syncing of large files. In addition, the code as-written only performs incremental syncing, and does not synchronize files that were present when the application starts. Adding these additional features is left as an exercise to the reader.

Hopefully this concise example has given you a feel for how an actor's single-threaded nature and the ability to chain them together into parallel pipelines are useful in solving real-world problems in an elegant way. In larger event-driven systems, your messages may be stored in some kind of shared queue service, and instead of actors you may have entire servers, but the same principles apply. This event-driven approach can be used in all sorts of scenarios where we have multiple processes we need to coordinate in a highly concurrent environment.

Exercise: Modify the pipelined file synchronizer to make it support synchronizing folders which start off with files and subfolders that need to be copied over.

See example 18.3 - InitialFiles

Exercise: Modify the pipelined file synchronizer to make it hash local files in parallel, using Futures. Note that although different files can be hashed and processed independently, we need to ensure that events related to each individual file are processed in order.

See example 18.4 - ForkJoinHashing

Exercise: Modify the pipelined file synchronizer to support deletes: if a user deletes a file from the source folder, the syncer should delete the corresponding file from the destination folder.

See example 18.5 - Deletes

Exercise: Assuming the file synchronizer is the only process writing to the destination folder, we should not need to ask agent what's on disk: the only files in the destination are the files that SyncActor put there. Modify the pipelined file synchronizer to keep track of the hashes of synchronized files locally, so we can avoid exchanging StatPath and StatInfo messages for files the syncer sent over itself.

See example 18.6 - VirtualFileSystem
Discuss Chapter 18 online at https://www.handsonscala.com/discuss/18