Next-level backends with Rama: recommendation engine in 80 LOC

29 min read Original article ↗

This is part of a series of posts exploring programming with Rama, ranging from interactive consumer apps, high-scale analytics, background processing, recommendation engines, and much more. This tutorial is self-contained, but for broader information about Rama and how it reduces the cost of building backends so much (up to 100x for large-scale backends), see our website.

Like all Rama applications, the example in this post requires very little code. It’s easily scalable to millions of reads/writes per second, ACID compliant, high performance, and fault-tolerant from how Rama incrementally replicates all state. Deploying, updating, and scaling this application are all one-line CLI commands. No other infrastructure besides Rama is needed. Comprehensive monitoring on all aspects of runtime operation is built-in.

In this post, I’ll explore building a recommendation engine that requires looking at a lot of information for an entity at once in order to compute recommendations. Algorithms like these cannot update recommendations incrementally. Instead, they occasionally recompute recommendations for each user.

The particular example I’ll show is the implementation of “who to follow” for Twitter, based on the version from our open-source Twitter-scale Mastodon implementation. “Who to follow” recommendations are computed by looking at who’s most followed by a user’s current follows and then filtering out any already followed by that user.

Code will be shown in both Clojure and Java, with the total code being about 80 lines for each implementation. You can download and play with the Clojure implementation in this repository or the Java implementation in this repository.

The implementation will work by recomputing the “who to follow” suggestions for a subset of users every 30 seconds. User IDs are 64-bit numbers, and the set of users chosen for an iteration are the user IDs that come directly after the user IDs from the previous iteration. Once it reaches the max user ID, it loops around and starts over from the beginning.

Indexed datastores in Rama, called PStates (“partitioned state”), are much more powerful and flexible than databases. Whereas databases have fixed data models, PStates can represent infinite data models due to being based on the composition of the simpler primitive of data structures. PStates are distributed, durable, high-performance, and incrementally replicated. Each PState is fine-tuned to what the application needs, and an application makes as many PStates as needed. For this application, we need three PStates: one to represent the social graph, one to store current “who to follow” recommendations for each user, and one to keep track of the next user ID for the next iteration of computing recommendations.

Here’s how the social graph PState will be defined in the application:

JavaClojure

1
2
3
4

topology.pstate(
  "$$follows",
  PState.mapSchema(Long.class,
                   PState.setSchema(Long.class).subindexed()));

1
2
3
4

(declare-pstate
  topology
  $$follows
  {Long (set-schema Long {:subindex? true})})

This declares the PState as a map of sets, with the keys and set values being 64-bit user IDs. The name of a PState always begins with $$ . For each user, it tracks the set of users they follow. The inner set is declared as “subindexed”, which instructs Rama to store the elements individually on disk rather than serialize the whole set as one value. Subindexing enables nested data structures to have billions of elements and still be read and written to extremely quickly.

In the full Twitter implementation you would also materialize a PState tracking the set of followers for each user, but since this post is focused on computing “who to follow” recommendations I’ll omit that. Adding that would only be a few more lines of code.

Here’s the PState definition for tracking the latest recommendations for each user:

JavaClojure

1
2
3
4

topology.pstate(
  "$$who-to-follow",
  PState.mapSchema(Long.class,
                   PState.listSchema(Long.class)));

1
2
3
4

(declare-pstate
  topology
  $$who-to-follow
  {Long [Long]})

This is simply a map from user ID to a list of recommended users. The inner list does not have to be subindexed because the algorithm will compute at most 300 recommendations for each user.

Finally, here’s the PState to keep track of the next user ID to start from for the next iteration of computing recommendations:

JavaClojure

1

topology.pstate("$$next-id", Long.class);

1

(declare-pstate topology $$next-id Long)

The PState is just a single 64-bit number, since that’s the entirety of the information it needs to track. Since PStates are distributed, it contains a single 64-bit number per partition of the PState. As you’ll see, that number will be a cursor into the the colocated $$follows PState partition. The ability to take advantage of colocated PStates is a unique and exceptionally powerful aspect of Rama, so let’s take a look at how Rama distributes storage and computation before continuing with the “who to follow” implementation.

Rama concepts

A Rama application is called a “module”. In a module you define all the storage and implement all the logic needed for your backend. All Rama modules are event sourced, so all data enters through a distributed log in the module called a “depot”. Most of the work in implementing a module is coding “ETL topologies” which consume data from one or more depots to materialize any number of PStates. Modules look like this at a conceptual level:

Modules can have any number of depots, topologies, and PStates, and clients interact with a module by appending new data to a depot or querying PStates. Although event sourcing traditionally means that processing is completely asynchronous to the client doing the append, with Rama that’s optional. By being an integrated system Rama clients can specify that their appends should only return after all downstream processing and PState updates have completed.

A module deployed to a Rama cluster runs across any number of worker processes across any number of nodes, and a module is scaled by adding more workers. A module is broken up into “tasks” like so:

A “task” is a partition of a module. The number of tasks for a module is specified on deploy. A task contains one partition of every depot and PState for the module as well as a thread and event queue for running events on that task. A running event has access to all depot and PState partitions on that task. Each worker process has a subset of all the tasks for the module.

Coding a topology involves reading and writing to PStates, running business logic, and switching between tasks as necessary.

Let’s first code the part of the module that materializes the social graph. The first step is to define a depot to receive new follow events:

JavaClojure

1
2
3
4
5
6

public class WhoToFollowModule implements RamaModule {
  @Override
  public void define(Setup setup, Topologies topologies) {
    setup.declareDepot("*follows-depot", Depot.hashBy("from"));
  }
}

1
2
3
4

(defmodule WhoToFollowModule
  [setup topologies]
  (declare-depot setup *follows-depot (hash-by :from))
  )

This declares a Rama module called “WhoToFollowModule” with a depot called *follows-depot which will receive all new follows information. Objects appended to a depot can be any type. The second argument of declaring the depot is called the “depot partitioner” – more on that later.

To keep the example simple, the data appended to the depot will be defrecord objects for the Clojure version and HashMap objects for the Java version. To have a tighter schema on depot records you could instead use Thrift, Protocol Buffers, or a language-native tool for defining the types. Here’s the function that will be used to create depot data:

JavaClojure

1
2
3
4
5
6

public static Map createFollow(long from, long to) {
  Map ret = new HashMap();
  ret.put("from", from);
  ret.put("to", to);
  return ret;
}

1

(defrecord Follow [from to])

Next, let’s begin defining the topology to consume data from the depot and materialize the PState. Here’s the declaration of the topology with the PState:

JavaClojure

1
2
3
4
5
6
7
8
9
10
11

public class WhoToFollowModule implements RamaModule {
  @Override
  public void define(Setup setup, Topologies topologies) {
    setup.declareDepot("*follows-depot", Depot.hashBy("from"));
    StreamTopology topology = topologies.stream("core");
    topology.pstate(
      "$$follows",
      PState.mapSchema(Long.class,
                       PState.setSchema(Long.class).subindexed()));
  }
}

1
2
3
4
5
6
7
8
9

(defmodule WhoToFollowModule
  [setup topologies]
  (declare-depot setup *follows-depot (hash-by :from))
  (let [topology (stream-topology topologies "core")]
    (declare-pstate
      topology
      $$follows
      {Long (set-schema Long {:subindex? true})})
    ))

This defines a stream topology called “core”. Rama has two kinds of topologies, stream and microbatch, which have different properties. In short, streaming is best for interactive applications that need single-digit millisecond update latency, while microbatching has update latency of a few hundred milliseconds and is best for everything else. Streaming is used here under the assumption that a social network would want immediate feedback on a new follow taking effect.

Notice that the PState is defined as part of the topology. Unlike databases, PStates are not global mutable state. A PState is owned by a topology, and only the owning topology can write to it. Writing state in global variables is a horrible thing to do, and databases are just global variables by a different name.

Since a PState can only be written to by its owning topology, they’re much easier to reason about. Everything about them can be understood by just looking at the topology implementation, all of which exists in the same program and is deployed together. Additionally, the extra step of appending to a depot before processing the record to materialize the PState does not lower performance, as we’ve shown in benchmarks. Rama being an integrated system strips away much of the overhead which traditionally exists.

Let’s now add the code to materialize the PState:

JavaClojure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

public class WhoToFollowModule implements RamaModule {
  @Override
  public void define(Setup setup, Topologies topologies) {
    setup.declareDepot("*follows-depot", Depot.hashBy("from"));
    StreamTopology topology = topologies.stream("core");
    topology.pstate(
      "$$follows",
      PState.mapSchema(Long.class,
                       PState.setSchema(Long.class).subindexed()));
    topology.source("*follows-depot").out("*data")
            .each(Ops.GET, "*data", "from").out("*from")
            .each(Ops.GET, "*data", "to").out("*to")
            .localTransform("$$follows", Path.key("*from").voidSetElem().termVal("*to"));
  }
}

1
2
3
4
5
6
7
8
9
10
11
12

(defmodule WhoToFollowModule
  [setup topologies]
  (declare-depot setup *follows-depot (hash-by :from))
  (let [topology (stream-topology topologies "core")]
    (declare-pstate
      topology
      $$follows
      {Long (set-schema Long {:subindex? true})})
    (<<sources topology
      (source> *follows-depot :> {:keys [*from *to]})
      (local-transform> [(keypath *from) NONE-ELEM (termval *to)] $$follows))
    ))

The code to implement the topology is only a few lines, but there’s a lot to unpack here. The business logic is implemented with dataflow. Rama’s dataflow API is exceptionally expressive, able to intermix arbitrary business logic with loops, conditionals, and moving computation between tasks. This post is not going to explore all the details of dataflow as there’s simply too much to cover. Full tutorials for Rama dataflow can be found on our website for the Java API and for the Clojure API.

Let’s go over each line of this topology implementation. The first step is subscribing to the depot:

JavaClojure

1

topology.source("*follows-depot").out("*data")

1
2

(<<sources topology
  (source> *follows-depot :> {:keys [*from *to]})

This subscribes the topology to the depot *follows-depot and starts a reactive computation on it. Operations in dataflow do not return values. Instead, they emit values that are bound to new variables. In the Clojure API, the input and outputs to an operation are separated by the :> keyword. In the Java API, output variables are bound with the .out method.

Whenever data is appended to that depot, the data is emitted into the topology. The Java version binds the emit into the variable *data , and the Clojure version destructures the fields into the variables *from and *to . All variables in Rama code begin with a * . The subsequent code runs for every single emit.

Remember that last argument to the depot declaration called the “depot partitioner”? That’s relevant here. Here’s that image of the physical layout of a module again:

The depot partitioner determines on which task the append happens and thereby on which task computation begins for subscribed topologies. In this case, the depot partitioner says to hash by the “from” field of the appended data. The target task is computed by taking the hash and modding it by the total number of tasks. This means data with the same ID always go to the same task, while different IDs are evenly spread across all tasks.

Rama gives a ton of control over how computation and storage are partitioned, and in this case we’re partitioning by the hash of the “from” user ID since that’s how we want the PState to be partitioned. This allows us to easily locate the set of user IDs any particular user ID is following.

The rest of the topology updates the PState to write the new follow:

JavaClojure

1
2
3
4
5
6

.each(Ops.GET, "*data", "from").out("*from")
.each(Ops.GET, "*data", "to").out("*to")
.localTransform("$$follows",
                Path.key("*from")
                    .voidSetElem()
                    .termVal("*to"));

1

(local-transform> [(keypath *from) NONE-ELEM (termval *to)] $$follows)

In the Clojure version, the “from” and “to” fields were already destructured. In the Java version, the Ops.GET function is run on the *data map to fetch the “from” and “to” fields and bind them to the new variables *from and *to .

The PState is updated with the “local transform” operation. The transform takes in as input the PState $$follows and a “path” specifying what to change about the PState. When a PState is referenced in dataflow code, it always references the partition of the PState that’s located on the task on which the event is currently running.

Paths are a deep topic, and the full documentation for them can be found here. A path is a sequence of “navigators” that specifies how to hop through a data structure to target values of interest. A path can target any number of values, and they’re used for both transforms and queries. In this case, the path navigates by the key *from to the set of follows for that user. The next navigator, called NONE-ELEM in Clojure and voidSetElem() in Java, navigates to the “void” element of the set. Setting that “void” element to a value with the “term val” navigator causes that value to be added to that set.

That completes everything involved in materializing that PState.

Computing recommendations

As mentioned, recommendations will be computed every 30 seconds. This means the module needs to take action based on the passage of time.

Rama provides a facility for this called “tick depots”. Whereas a normal depot emits whenever new data is appended to it, a tick depot emits when the configured amount of time has passed. Subscribing to a tick depot from a topology is no different than subscribing to a regular depot. Here’s how the tick depot is declared in the module:

JavaClojure

1

setup.declareTickDepot("*who-to-follow-tick", 30000);

1

(declare-tick-depot setup *who-to-follow-tick 30000)

This creates a tick depot named *who-to-follow-tick that emits every 30,000 milliseconds (30 seconds).

Let’s now declare the topology along with the PStates it will create:

JavaClojure

1
2
3
4
5
6

MicrobatchTopology mb = topologies.microbatch("who-to-follow");
mb.pstate(
  "$$who-to-follow",
  PState.mapSchema(Long.class,
                   PState.listSchema(Long.class)));
mb.pstate("$$next-id", Long.class);

1
2
3
4
5
6
7

(let [mb (microbatch-topology topologies "who-to-follow")]
  (declare-pstate
    mb
    $$who-to-follow
    {Long [Long]})
  (declare-pstate mb $$next-id Long)
  )

A microbatch topology is used instead of a stream topology because single-digit millisecond latency on processing emits doesn’t matter for this asynchronous background work. Microbatch topologies have higher throughput, exactly-once processing semantics, and additional batching capabilities needed by this topology.

The computation for each iteration of “who to follow” recommendations will work as follows:

  • Fetch the set of users to compute new recommendations for this iteration
  • For each of those users:
    • Fetch all of their follows
    • Fetch all the follows of each of those follows to get all the recommendation candidates
    • Aggregate the number of times each candidate is reached in the previous step
    • Reverse sort the list of candidates by the count
    • Filter out candidates who are already followed by the user

Here’s the full code implementing this algorithm. There’s multiple new concepts in this code, so I’ll explain it line by line afterwards.

JavaClojure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

mb.source("*who-to-follow-tick").out("*microbatch")
  .explodeMicrobatch("*microbatch")
  .batchBlock(
     Block.allPartition()
          .localSelect("$$next-id", Path.stay()).out("*start-id")
          .localSelect("$$follows",
                       Path.sortedMapRangeFrom("*start-id",
                                               SortedRangeFromOptions.maxAmt(15)
                                                                     .excludeStart())).out("*m")
          .ifTrue(new Expr(Ops.LESS_THAN, new Expr(Ops.SIZE, "*m"), 15),
             Block.localTransform("$$next-id", Path.termVal(-1L)),
             Block.each((SortedMap m) -> m.lastKey(), "*m").out("*max-id")
                  .localTransform("$$next-id", Path.termVal("*max-id")))
          .each(Ops.EXPLODE_MAP, "*m").out("*account-id", "*follows")
          .each(Ops.EXPLODE, "*follows").out("*following-id")
          .hashPartition("*following-id")
          .localSelect("$$follows", Path.key("*following-id")
                                        .all()).out("*candidate-id")
          .keepTrue(new Expr(Ops.NOT_EQUAL, "*account-id", "*candidate-id"))
          .hashPartition("*account-id")
          .compoundAgg(CompoundAgg.map("*account-id",
                                       CompoundAgg.map("*candidate-id",
                                                       Agg.count()))).out("*m")
          .each(Ops.EXPLODE_MAP, "*m").out("*account-id", "*candidate-counts")
          .each((Map cc) -> {
             ArrayList<Map.Entry<Long, Integer>> l = new ArrayList(cc.entrySet());
             l.sort(Map.Entry.comparingByValue());
             Collections.reverse(l);
             List ret = new LinkedList();
             for(int i=0; i < Math.min(1000, l.size()); i++) ret.add(l.get(i).getKey());
             return ret;
          }, "*candidate-counts").out("*candidate-order")
          .each((RamaFunction0) ArrayList::new).out("*who-to-follow")
          .loop(
             Block.ifTrue(new Expr((List w, List c) -> w.size() >= 300 || c.size() == 0,
                                   "*who-to-follow", "*candidate-order"),
               Block.emitLoop(),
               Block.yieldIfOvertime()
                    .each((LinkedList l) -> l.pop(), "*candidate-order").out("*candidate-id")
                    .localSelect("$$follows",
                                 Path.key("*account-id")
                                     .view(Ops.CONTAINS, "*candidate-id")).out("*already-follows?")
                    .ifTrue(new Expr(Ops.NOT, "*already-follows?"),
                      Block.each((List l, Long cid) -> l.add(cid), "*who-to-follow", "*candidate-id"))
                    .continueLoop()))
          .localTransform("$$who-to-follow", Path.key("*account-id").termVal("*who-to-follow")));

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

(<<sources mb
  (source> *who-to-follow-tick :> %microbatch)
  (%microbatch)
  (<<batch
    (|all)
    (local-select> STAY $$next-id :> *start-id)
    (local-select>
      (sorted-map-range-from *start-id
                             {:max-amt 15
                              :inclusive? false})
      $$follows :> *m)
    (<<if (< (count *m) 15)
      (local-transform> (termval -1) $$next-id)
     (else>)
      (key (-> *m rseq first) :> *max-id)
      (local-transform> (termval *max-id) $$next-id))
    (ops/explode-map *m :> *account-id *follows)
    (ops/explode *follows :> *following-id)
    (|hash *following-id)
    (local-select> [(keypath *following-id) ALL]
      $$follows :> *candidate-id)
    (filter> (not= *candidate-id *account-id))
    (|hash *account-id)
    (+compound {*account-id {*candidate-id (aggs/+count)}} :> *m)
    (ops/explode-map *m :> *account-id *candidate-counts)
    (mapv first
          (->> *candidate-counts
               vec
               (sort-by (comp - val))
               (take 1000))
          :> *candidate-order)
    (loop<- [*chosen []
             *candidate-order (seq *candidate-order)
             :> *who-to-follow]
      (<<if (or> (>= (count *chosen) (num-who-to-follow-recs))
                 (empty? *candidate-order))
        (:> *chosen)
       (else>)
        (yield-if-overtime)
        (first *candidate-order :> *candidate-id)
        (local-select>
          [(keypath *account-id) (view contains? *candidate-id)]
          $$follows :> *already-follows?)
        (<<if (not *already-follows?)
          (continue> (conj *chosen *candidate-id)
                     (next *candidate-order))
         (else>)
          (continue> *chosen (next *candidate-order)))
        ))
    (local-transform>
      [(keypath *account-id) (termval *who-to-follow)]
      $$who-to-follow)
    ))

Let’s go through how this works step by step, starting with subscribing to the tick depot:

JavaClojure

1
2

mb.source("*who-to-follow-tick").out("*microbatch")
  .explodeMicrobatch("*microbatch")

1
2
3
4

(<<sources mb
  (source> *who-to-follow-tick :> %microbatch)
  (%microbatch)
  )

Like in the previous topology, the first line subscribes the topology to the tick depot. Unlike stream topologies, those source subscriptions do not emit data directly. Microbatch topologies are pull based, and invoking the object emitted by the source subscription will fetch whatever data has accumulated on the depot since the previous microbatch iteration.

The data is fetched in the Clojure API by invoking %microbatch as an operation and in the Java API by calling .explodeMicrobatch . For a tick depot, that call emits one time if a tick is ready and otherwise doesn’t emit at all.

The next bit of code determines which user ID to use to start fetching the set of users for this iteration:

JavaClojure

1
2
3

.batchBlock(
   Block.allPartition()
        .localSelect("$$next-id", Path.stay()).out("*start-id")

1
2
3

(<<batch
  (|all)
  (local-select> STAY $$next-id :> *start-id))

The first line begins a “batch block”. Batch blocks enhance dataflow with the capabilities of relational languages like SQL, such as inner joins, outer joins, subqueries, and aggregation. From those additional capabilities this use case only needs the aggregation capability in order to aggregate candidate counts. Batch blocks don’t fundamentally change how dataflow works, and up until aggregation is used the batch block body works identically to any other dataflow code.

The first line of the batch block does an “all partition”. Partitioners change where subsequent code executes, and an “all partition” causes the subsequent code to run in parallel across all tasks of the module. The details of moving computation, like serializing and deserializing any variables referenced after the partitioner, are handled automatically. The code is linear without any callback functions even though partitioners could be jumping around to different tasks on different nodes.

In this case, computation needs to run on every task because a set of users will be selected from every partition of the $$follows PState. The selection of users and subsequent computation of recommendations all run in parallel.

Finally, the last line of this code selects the user ID contained in $$next-id . $$next-id contains the highest user ID on this partition from the last time the recommendations computation ran. A “local select” does a query on the partition of the PState that’s on the same task on which the event is running. As described before, this PState contains a single number per partition, so the query uses the “stay” navigator to extract the value into the variable *start-id .

The next line of code selects the users to compute recommendations for:

JavaClojure

1
2
3
4
5

.localSelect("$$follows",
             Path.sortedMapRangeFrom(
               "*start-id",
               SortedRangeFromOptions.maxAmt(15)
                                     .excludeStart())).out("*m")

1
2
3
4
5

(local-select>
  (sorted-map-range-from *start-id
                         {:max-amt 15
                          :inclusive? false})
  $$follows :> *m)

Top-level maps in PStates are sorted, so this query uses the “sorted map range from” navigator to select a submap of that PState into the variable *m . The navigator takes in a start key and some options. In this case it selects at most 15 users starting from the key immediately after *start-id . Since *start-id was the maximum ID in the last iteration of recommendations, it is excluded from this iteration.

Note that it selects 15 users per task. So if the module was launched with 128 tasks, it will select 3840 users per iteration. Since the microbatch runs every 30 seconds, that’s 7680 users per minute. At that rate, if the application has 100M users each user will have their recommendations recomputed once every 9 days.

The next bit of code updates $$next-id for the next iteration:

JavaClojure

1
2
3
4

.ifTrue(new Expr(Ops.LESS_THAN, new Expr(Ops.SIZE, "*m"), 15),
   Block.localTransform("$$next-id", Path.termVal(0L)),
   Block.each((SortedMap m) -> m.lastKey(), "*m").out("*max-id")
        .localTransform("$$next-id", Path.termVal("*max-id")))

1
2
3
4
5

(<<if (< (count *m) 15)
  (local-transform> (termval -1) $$next-id)
 (else>)
  (key (-> *m rseq first) :> *max-id)
  (local-transform> (termval *max-id) $$next-id))

If the prior query selected less than 15 elements, that means the end of the PState has been reached. In that case, $$next-id is set to -1 in order to start from the beginning on the next iteration. Otherwise, it sets $$next-id to the maximum ID from the submap that was selected. In the Java version, you can see how arbitrary Java code can be mixed into a topology using lambdas.

You might be wondering about the fault-tolerance of setting $$next-id before computing the recommendations. What happens if the topology fails while computing recommendations due to a node dying and needs to be retried? Won’t setting $$next-id before completing the work cause it to skip a set of users?

The answer to that last question is no. Microbatch topologies have exactly-once semantics, and failures are handled by resetting all PStates to their state at the last successful microbatch. They’re a cross-partition transaction in every case no matter how complicated the logic.

Let’s take a look at the next line of the topology:

JavaClojure

1

.each(Ops.EXPLODE_MAP, "*m").out("*account-id", "*follows")

1

(ops/explode-map *m :> *account-id *follows)

This emits every key/value pair of the selected submap and binds them to the variables *account-id and *follows . Note that this operation emits once for every entry in the map, so if the map has 15 elements it will emit 15 times.

The next line emits everyone that *account-id follows:

JavaClojure

1

.each(Ops.EXPLODE, "*follows").out("*following-id")

1

(ops/explode *follows :> *following-id)

This simply emits everyone in the nested follows set bound to the variable *following-id .

Next everyone *following-id follows is fetched:

JavaClojure

1
2
3

.hashPartition("*following-id")
.localSelect("$$follows", Path.key("*following-id")
                              .all()).out("*candidate-id")

1
2
3

(|hash *following-id)
(local-select> [(keypath *following-id) ALL]
               $$follows :> *candidate-id)

The code first calls “hash partition” to switch to the task containing data for that user. A hash partitioner works exactly like the aforementioned depot partitioner. Then, the code does a query on the partition of $$follows local to that task to emit one time for everyone that user follows. Each emit is bound to the variable *candidate-id .

The next line filters the original account ID from candidates, since we don’t want to recommend a user to follow themselves:

JavaClojure

1

.keepTrue(new Expr(Ops.NOT_EQUAL, "*account-id", "*candidate-id"))

1

(filter> (not= *candidate-id *account-id))

This line emits once if the condition is true, and otherwise it doesn’t emit.

The next line gets all information for candidates for an account onto the same task for aggregation:

JavaClojure

1

.hashPartition("*account-id")

Using a hash partitioner also ensures that data across all the account IDs being processed in this microbatch are evenly spread across all tasks to maximize the efficiency of the computation.

Next, all the information about how many times a candidate has been reached for each account ID is aggregated together:

JavaClojure

1
2
3

.compoundAgg(CompoundAgg.map("*account-id",
                             CompoundAgg.map("*candidate-id",
                                             Agg.count()))).out("*m")

1

(+compound {*account-id {*candidate-id (aggs/+count)}} :> *m)

Up until here, the topology has been just like the previous ETL topology. Every line processed emits from the preceding line and emitted some number of times itself. Aggregators in dataflow are different in that they’re collecting all emits that happened in the execution of the batch block and combining them into a single value per task.

In this case, the aggregation is specified as “compound aggregation”, which allows one or more aggregations to be composed into a data structure. This compound aggregation produces a map from account ID to candidate ID to the number of times that candidate appeared for that account. The result of aggregation is bound to the variable *m , which will be bound one time on every task for this batch block.

The code after aggregation is called the “postagg”, and it’s normal dataflow just like before. What’s left in this topology is to sort the candidates by their count, filter out any that are already followed by that account, and then record the result into the $$who-to-follow PState.

The next line emits each aggregated account ID along with their candidate counts map:

JavaClojure

1

.each(Ops.EXPLODE_MAP, "*m").out("*account-id", "*candidate-counts")

1

(ops/explode-map *m :> *account-id *candidate-counts)

The next line processes the map of candidate counts to pull out the top 1000 candidates:

JavaClojure

1
2
3
4
5
6
7
8

.each((Map cc) -> {
   ArrayList<Map.Entry<Long, Integer>> l = new ArrayList(cc.entrySet());
   l.sort(Map.Entry.comparingByValue());
   Collections.reverse(l);
   List ret = new LinkedList();
   for(int i=0; i < Math.min(1000, l.size()); i++) ret.add(l.get(i).getKey());
   return ret;
}, "*candidate-counts").out("*candidate-order")

1
2
3
4
5
6

(mapv first
      (->> *candidate-counts
           vec
           (sort-by (comp - val))
           (take 1000))
      :> *candidate-order)

This logic is implemented with regular Clojure or Java code, with the Java code using a lambda to insert the logic. The new variable *candidate-order is bound with the list of candidates to check.

Now, each candidate needs to be checked to see if they’re already followed by *account-id . They will be checked one by one in a loop until enough recommendations have been found or there are no more candidates to check. The code could alternatively do an explode, check all the candidates, and then aggregate unfollowed candidates into a list, but that’s less efficient since it checks more candidates than necessary.

Here’s the start of the loop:

JavaClojure

1
2

.each((RamaFunction0) ArrayList::new).out("*who-to-follow")
.loop(

1
2
3

(loop<- [*chosen []
         *candidate-order (seq *candidate-order)
         :> *who-to-follow]

The body of a dataflow loop can emit any number of times. Another iteration of a loop is initiated with continue> in Clojure or .continueLoop in Java. Loops can also have “loop variables” which are in scope for the body of the loop and are set to new values each time the loop is recurred. The Clojure version uses loop variables to keep track of accumulated recommendations and the candidates remaining to check, while the Java version does not use loop variables and uses mutable objects instead to track those.

The Clojure version initializes the list of chosen recommendations as an empty vector in the variable *chosen , and it initializes the seq of candidates to check in the variable *candidate-order .

The Java version initialized *candidate-order as a linked list in the previous step, and it will pop the front of that list in each iteration. Selected recommendations are stored in the ArrayList *who-to-follow .

The start of the loop body checks if there’s no more work to do:

JavaClojure

1
2
3

Block.ifTrue(new Expr((List w, List c) -> w.size() >= 300 || c.size() == 0,
                      "*who-to-follow", "*candidate-order"),
  Block.emitLoop(),

1
2
3

(<<if (or> (>= (count *chosen) 300)
           (empty? *candidate-order))
  (:> *chosen)

If 300 candidates that aren’t already followed have been found or there are no more candidates to check, it emits from the loop. This causes the code subsequent to the loop to run.

The next code starts the else branch of that condition:

JavaClojure

1
2

(else>)
 (yield-if-overtime)

In the Clojure API, (else>) divides the blocks of code handling the “then” and “else” conditions of an <<if . In the Java API, the “else” branch is another argument to ifTrue .

The first line of the “else” branch does a “yield if overtime”. To understand this, let’s take a look at the physical layout of a module again:

Only one event can run on a task at a time. So when an ETL event is running, any other requests to the task are queued behind it in the event queue on the task. This includes client PState reads, telemetry collection, and much more. Having exclusive access to a task like this is exceptionally powerful, letting you trivially do things like update multiple PStates atomically or do multiple reads across multiple PStates at the same point in time.

However, as the saying goes: with great power comes great responsibility. When coding topologies, you must make sure not to create events that hold a task thread for an excessive amount of time. If it does, client PState queries to that task can be delayed for a long time if they happen to queue up while a long ETL event is running. Doing a few PState operations is completely fine, but doing a large amount of PState queries in a loop can be excessive.

The “yield if overtime” operation solves this problem, and the only context it’s ever needed is in loops like this that do a lot of work on the same task. If the current event has been running for too much time (by default the time limit is 5 milliseconds), the code yields the thread to other events and then resumes in a future event. This allows other work on the task thread to interleave with the execution of the topology.

Similar to partitioners, the code after the “yield if overtime” call executes asynchronously, but the code is written linearly with no callback functions. This is the beauty of dataflow for asynchronous programming.

The next line determines the next candidate to check:

JavaClojure

1

.each((LinkedList l) -> l.pop(), "*candidate-order").out("*candidate-id")

1

(first *candidate-order :> *candidate-id)

This simply grabs the first value in *candidate-order and binds it to the new variable *candidate-id .

Now the topology is ready to check if that candidate is already followed by *account-id :

JavaClojure

1
2
3

.localSelect("$$follows",
             Path.key("*account-id")
                 .view(Ops.CONTAINS, "*candidate-id")).out("*already-follows?")

1
2
3

(local-select>
  [(keypath *account-id) (view contains? *candidate-id)]
  $$follows :> *already-follows?)

This does a PState query to see if *candidate-id is in the follows set for *account-id . The path navigates to the nested set by the key *account-id and then uses the view navigator to run a function on that set. The function checks if *candidate-id is contained in the set. The query emits the boolean result of that function to the variable *already-follows? .

The next code completes the loop body implementation:

JavaClojure

1
2
3

.ifTrue(new Expr(Ops.NOT, "*already-follows?"),
  Block.each((List l, Long cid) -> l.add(cid), "*who-to-follow", "*candidate-id"))
.continueLoop()

1
2
3
4
5

(<<if (not *already-follows?)
  (continue> (conj *chosen *candidate-id)
             (next *candidate-order))
 (else>)
  (continue> *chosen (next *candidate-order)))

If the candidate is not already followed by *account-id , it’s added to the list of recommendations. Then the next loop iteration is invoked. The Clojure version updates the loop variables accordingly, while the Java version just continues the loop with no arguments since it’s keeping track of state in mutable objects.

The final code of the topology runs after the loop:

JavaClojure

1

.localTransform("$$who-to-follow", Path.key("*account-id").termVal("*who-to-follow")

1
2
3

(local-transform>
  [(keypath *account-id) (termval *who-to-follow)]
  $$who-to-follow)

This simply writes the list of recommendations in *who-to-follow into the $$who-to-follow PState for *account-id .

No partitioner calls were necessary in the postagg since the aggregation was partitioned by the hash of account IDs, and all information for each account ID is stored on the same task.

Finally, it’s worth emphasizing that the entire topology runs in parallel across all tasks. For most of the topology you’re able to think in terms of one account ID at a time, but due to how dataflow works it’s running in a highly parallel way.

Fetching recommendations

Let’s look at an example of how you would fetch recommendations for an account ID from a client, such as in your web server. First, you would fetch a client for the PState:

JavaClojure

1
2
3
4

Map config = new HashMap();
config.put("conductor.host", "1.2.3.4");
RamaClusterManager manager = RamaClusterManager.open(config);
PState whoToFollowPState = manager.clusterPState("nlb.WhoToFollowModule", "$$who-to-follow");

1
2

(def manager (open-cluster-manager {"conductor.host" "1.2.3.4"}))
(def who-to-follow-pstate (foreign-pstate manager "nlb.who-to-follow/WhoToFollowModule" "$$who-to-follow"))

A “cluster manager” connects to a Rama cluster by specifying the location of its “Conductor” node. The “Conductor” node is the central node of a Rama cluster, which you can read more about here. From the cluster manager, you can retrieve clients to any depots or PStates for any module.

Querying this PState is trivial:

JavaClojure

1

List recs = whoToFollowPState.selectOne(Path.key(123L));

1

(def recs (foreign-select-one (keypath 123) who-to-follow-pstate))

Client queries use the exact same path API as used for queries and transforms in topologies. This simply fetches the value for account ID 123 from the PState, which is the list of recommendations.

In a real Twitter application, this isn’t sufficient as you also need to filter out any recommendations that have been followed by the user since the last time recommendations were updated for them. In our Twitter-scale Mastodon implementation, we implemented this with a query topology to batch the checks rather than do many roundtrips from the client.

Summary

There’s a lot to learn with Rama, but you can see from this example application how much you can accomplish with very little code. Building a scalable and fault-tolerant batch-based recommendation engine is no small feat, but with Rama it only took 80 lines of code. There’s no additional work needed for deployment, updating, and scaling since that’s all built-in to Rama. For an experienced Rama programmer, a project like this takes only a few hours to fully develop, test, and have ready for deployment.

This example shows how powerful dataflow programming is. The code has total control over what executes and where, with no callback functions ever needed even though the logic is highly asynchronous and concurrent. Being able to write a loop that does arbitrary logic and jumps around the cluster as it goes is empowering.

The version of “who to follow” in our Twitter-scale Mastodon implementation also has additional handling to improve the experience for new users. New users have their recommendations recomputed immediately after passing a threshold of follows so that they get recommendations quickly rather than need to wait potentially many days for the recommendation engine to select them. Recommendations are also augmented with the top followed users on the platform if they don’t have enough organic recommendations.

As mentioned earlier, there’s a Github project for the Clojure version and for the Java version containing all the code in this post. Those projects also have tests showing how to unit test modules in Rama’s “in-process cluster” environment.

You can get in touch with us at consult@redplanetlabs.com to schedule a free consultation to talk about your application and/or pair program on it. Rama is free for production clusters for up to two nodes and can be downloaded at this page.