I ‘ve been working on a new service/product for our companies(Phaistos Networks and BestPrice), one that incorporates “new” technologies and some powerful and important paradigms/principles for performance, scalability, and simplicity.
Based on my experiments, benchmarks, and the work implementing the system outlined later, I am confident that such designs can provide benefits that were hard to practically impossible to realize in the past.
This new service implements various functions related to Digital Marketing(Ad Server, programmatic advertising, contextual targeting, brand safety, and much more) and thanks to its design, supporting more ideas and functions should be easy and very straight-forward.
It will eventually replace our current suite of services that implement some of that functionality and will serve as the foundation technology for all our Digital Marketing services and products, and the core technology will be used to build new services for both our companies going forward.
This blog post outlines key ideas and technologies involved in this design. It is not an exhaustive analysis, but I hope it will be of use to people interested in high performance, scalable designs.
Reactors
The service is based on an event-driven reactor design. There are multiple OS threads in use, each of them running its own reactor(“reactor core”). The reactor is responsible for multiplexing I/O (network and disk) and coroutines execution, managing timers, and more.
The first reactor(“reactor0”) is special in many ways, one of them being that it is the reactor that accepts and process API requests, while all other reactors accept and process “ads” requests(which includes requests for ads to be displayed in a page, clicks, RTB(Real-Time-Bidding), and many more).
For managing timers, we use EBTs for timers that can be canceled, linked lists(will likely use a ring-buffer design later) for timers that can’t be canceled and their expiration time is monotonically increasing, and binary heaps(priority queues) for tasks that can’t be canceled but their expiration time is not monotonically increasing.
Having three different trackers for timers makes sense because not all timers have the same requirements and the cost of managing, for example, a binary heap is lower than the cost of managing an EBT.
Here’s the reactor event loop:
Press enter or click to view image in full size
Single Writer Principle
This powerful principle is used extensively. For every resource, there’s only one owner(reactor). Reactor0 is the owner of almost all resource types, which is convenient because only API requests update them (and as explained earlier, only reactor0 processes such requests).
Reactor1 owns some other important data and other reactors that need to access that need to suspend their current coroutine, submit a task to that reactor, and wait to be resumed and provided the result of that task.
In this example, the current coroutine is suspended and a callable is scheduled on reactor1. Once this is invoked and the result is available, reactor1 sends a message to the reactor of the suspended coroutine, and the coroutine is resumed and gets the value from the callable.
Press enter or click to view image in full size
There are other such facilities for message passing/scheduling callables.
- reactors_task() can be used to invoke a callable to all other reactors and collect their results before the suspended coroutine is resumed.
- reactor_task() can be used to schedule a callable to be executed by a specific reactor.
- background_task() can be used to schedule a callable to be executed by a thread in a special background threads pool.
- all_results() can be used to suspend the current coroutine and wait for results from multiple awaitables
- timeout() can be used to suspend the current coroutine for as many milliseconds required. This handy for simulating workloads and testing.
Resources Registry
There is a resources registry, which effectively manages various resource types maps. Each reactor has its own registry, and they all “eventually” match reactor0’s registry.
Whenever a resource is updated, we use RCU(Read-Copy-Update) to first create a copy of the resource, update the copy, and replace that in the ‘local’ registry, and then using “reactors tasks” to update all other registries’ registries as well.
Press enter or click to view image in full size
This is simple and powerful, but there’s a catch. Because most resources also need to track (pointers) their children, a naive RCU based scheme would be expensive.
This is because when a resource is updated, its pointer changes (because we create and update a clone in the RCU dance). Because its parent tracks it by pointer, the parent needs to be updated to point to the new object(new address), and because it was updated, then its parent also needs to be updated, all the way to the root resource.
This is in practice not that expensive especially considering that most resources are infrequently updated, but it’s definitely not optimal. To get around this, we treat edges (references to children) as special resource types. Each resource that needs to track edges for any resources acquires an edges ID during construction. That ID never changes.
If a child of a resource is updated, then we just need to use RCU to only update the edges (resource) of the parent for that type of children resources without having to RCU the parent(and thus all other top-level resources ).
This works great, although the runtime that manages them is somewhat involved for performance reasons, it’s one of the situations where such a tradeoff is worth it.
This example illustrates how that works
Press enter or click to view image in full size
C++20 Coroutines
I ‘ve been waiting for C++20 coroutines for a long time.
It’s a game-changer, just like io_uring is a game-changer technology. Coroutines in C++ are very powerful, and very expressive, thanks to a very elegant design. I ‘ve been experimenting with coroutines for a couple of years now, using various libraries, rolling my own implementations, but it never felt right — stackfull coroutines are too wasteful/limiting for the kind of use cases I am interested in, and the performance isn’t great. The brilliant folks who designed and implemented C++20 coroutines nailed it though.
Each request or task is decomposed into potentially multiple coroutines, and each such coroutine may be implemented as multiple coroutines, and so on — all of them running concurrently. This is such a powerful abstraction, especially when its practically a zero-cost facility.
Futures is a popular abstraction for encapsulating work(data, code) and continuations. There are some very real issues with futures though. It is hard to make sense of the code if the use of futures is fairly involved. They are also slow. If you are interested in that, you should watch this fantastic talk by Eric Niebler and David Hollman.
This is not a problem with C++20 coroutines. The programming model makes a lot of sense(very sane), there is practically no overhead, especially for suspending and resuming coroutines, and this being directly supported by the compiler (as opposed to futures) allows for all kinds of optimizations that just aren’t possible with futures.
simdjson
All API and “ads” requests represented as JSON objects are parsed using the fantastic simdjson library. If the request(“document”) to parse is too large, a background task is used instead, because otherwise, it would stall the reactor’s OS thread for potentially 100s of microseconds, and nothing is allowed to stall the reactor cores for more than a few microseconds.
Press enter or click to view image in full size
Epochs
Whenever a resource is updated or deleted, it is not immediately destroyed. Instead, an epochs-based GC is used, where the “old” resource is scheduled for GC as soon as it is safe to do so.
There is still a single reactor that owns all resources, but how do we prevent a resource that’s currently accessed(read-only) by other reactors from deletion, which would result in a crash, or worse? Reference counting is often used to that end, and while this generally works, it’s not great. You need to bloat structures with a reference counter, you need atomic operations for retaining and releasing(incrementing and decrementing) references, and its easy to forget to retain, or release, which will certainly lead to a crash. This is tedius, at best, very risky and expensive at worst. With epochs, you don’t need to do any of that.
The epochs design is based on Microsoft’s FASTER epochs design.
There are readers and writers epochs control objects. Effectively, one writer for reactor0 (because it’s the only reactor permitted to modify resources), and a reader for every reactor. If reactor0 needs to dispose of any object (because it is either deleted, or it is cloned and its clone will replace it), it uses its writer to schedule a GC for it as soon as it’s safe to do so. If any such GCs are scheduled when processing a request, then reactor0 also advances the writer epoch before processing’s over. Readers on the other hand that need to access any resources initiate a “read only transaction” and when it is done it terminates it. This is the epochs API, which hopefully explains this better.
Press enter or click to view image in full size
Here’s an example that illustrates how epochs are used by readers and writers
Press enter or click to view image in full size
No Locks
Other than a mutex used to protect a queue used for scheduling “background tasks” to a background threads pool, there are no other locks or atomic operations used. Because the resources are shared by reactor0 to all other reactors as strictly read-only immutable “objects”, there is no need to serialize access to them. There only lock-free queues used for, effectively, message passing between threads(reactor threads and threads in the background threads pool).
LW Locks
Reactor0 still needs to serialize access to some resources sometimes, even if such resources are exclusively owned by that reactor. Because the processing may suspend the coroutine, and more requests may be processed before the processing resume, and while processing any of those other requests(or tasks) access to the same resource may be necessary, we need to support both shared(multiple readers, no writers) and exclusive(no readers, single writer) “light-weight locks”.
When attempting to acquire such a lock and that’s not possible, the current coroutine is suspended, and once the lock is released by the coroutine that acquired it, the suspended coroutine is resumed and can acquire the lock.
The problem with locks, and it is a problem, is that when an execution context needs to hold multiple locks at the same time, then unless those locks are always taken at the same order(deterministically), it is very easy to get into a livelock. This is why most applications that need to hold multiple locks at the same time often order(sort) resources/locks before they attempt to acquire the locks. While this works most of the time, it doesn’t work all the time.
The LW Locks implementation can identify livelocks with zero overhead. When it does, it throws a special exception, and when that propagates back to the trampoline function (which is responsible for creating the coroutine for processing a request, or a task), then that trampoline function backs off for 1s and tries again — and if it fails, then it reports an error. This is a far better alternative to crashing, or doing nothing(in that case, whatever locks and other resources acquired or created by the coroutine would have not been released, would eventually result in the program to hang).
The implementation of those lw-locks can be found on Github.
Press enter or click to view image in full size
Memory Management
Each reactor reuses all allocated memory and also resources “coroutine frames” for the allocation of new coroutines frame. This is all done for performance. C++20 coroutines allow for coroutines behavior customization via a promise_type structure. One can provide operator new() and operator delete() implementations which access reactor::cur() for allocating and releasing(for future reuse) coroutine frames. Network and disk I/O use byte buffers, which are also reused, along with pretty much every other object(including vectors, and such).
io_uring
io_uring is incredible. Long story short, it’s not just a solution to the various many woes related to asynchronous disk I/O on Linux, it can also help with blocking I/O and will likely expand to include support for OPs not strictly related to network and disk I/O.
This service probes for io_uring (based on kernel release) availability, and if it is available, it uses it for both network and disk I/O. The performance gains are very impressive, and the programming model is very elegant.
Circuit Breakers
There are sort of circuit breakers in use for controlling access to resources. For example, each reactor tracks the number of active requests being processed (each of them with its own coroutine). If that gets too high, the reactor stops listening for new connections, and therefore accepting new requests, until that number drops below a low watermark. This is trivial to implement because all reactors accept connections to the same listen address (using SO_REUSEADDR and SO_REUSEPORT). So closing the accept FD in one reactor means that the kernel will distribute more connections to the other reactors.
Many resources are protected by a gate guard which allows a finite number of consumers/users. Once this is exceeded, the coroutine attempting to access them is suspended and resumed again later when possible, similarly to how LW Locks work. The service also needs to access remote services; access to them is gated via a CB based on exponential backoffs and rate-limiting. The overhead of those systems is practically ‘zero’ because suspending and resuming coroutines is very cheap(comparable to the cost of a function call).
Background Tasks
Reactor cores, each using an OS thread, should never stall. If they were to stall for longer than a few microseconds, all coroutines, disk, and network I/O processing would also stall. At the same time, some tasks that may take a relatively long amount of time to execute, and so offloading those tasks to other threads, before suspending the coroutine, and resuming them when that task has been executed a good idea.
There is a threads pool and a policy-based scheduling system for scheduling tasks with those threads. There are three policies. Policy0 for very high priority tasks, Policy1 for medium priority tasks, and Policy2.
There is a single threads pool with 10 threads. The first 4 are assigned to policy0, and the remaining 6 are divided between policy1 and policy2. Policy1 is used when processing API requests, and policy2 is used for analytics tasks (where we don’t care so much for latency).
What’s perhaps interesting is how tasks are dequeued by those threads based on their policy. We need 3 bits to denote that 1 or more tasks are pending for each of those classes and a futex. The 4 threads assigned to policy0 monitor the first bit (see FUTEX_WAIT_BITSET). The 3 threads assigned to policy1 monitor the first two bits. The other 3 threads assigned to policy2 monitor all first 3 bits.
This means that policy0 (high priority) tasks will be processed by any of the 10 threads, policy1 tasks will be processed by any of the 6 remaining threads, and policy2 (lowest priority) tasks can only be processed by any idle 3 threads.
Data-Oriented Design
The efficient use of the underlying hardware is important for performance and scalability. In practice, this is mostly about memory access patterns — because while CPUs are very fast, access to RAM is still relatively very slow, which is why minimizing CPU caches misses (along with branch mispredictions, when possible, and write-write contention — data stalls, data dependencies) is one of the primary ways to accomplish that.
Data-Oriented Design is an alternative programming paradigm to OOP, where, roughly speaking, execution “kernels operate on data tightly packed together. For example, if you were controlling alien spaceships, instead of representing each such ship as an object with a position, name, and a bunch of other properties, you instead have different arrays for all different properties of all different spaceships, and different kernels for operating on position, names, etc.
Those talks by Mike Acton and Stoyan Nikolov should explain the concepts properly and in-depth. You are trading some convenience(no longer dealing with objects) for performance, and in some cases, that performance difference is very substantial. While this may not be necessary for most tasks, it can be extremely valuable for extracting absolute performance where it matters.
This service uses DOD techniques for various tasks. There are “Systems”(i.e kernels) that process “Components”(i.e properties of some objects packed together), running one after the other in background threads.
Batching
There is probably an obvious suggestion to make, but I am going to include it here anyway. All io_uring operations and background tasks submitted are enqueued and are processed as a batch only once in every reactor run-loop iteration. Submitting background tasks takes on average 7us, and submitting io_uring tasks takes on average(depending on number and type of operations) 12us. If, for example, we have 1000 concurrent coroutines multiplexed by a reactor, each of them submitting one io_uring op and one background task in the context of the same reactor run-loop iteration, you get a close to 20 milliseconds stall/overhead, which is very high. If you have 100,000 coroutines, then you have close to 2 seconds stalls, which is obviously very prohibitive. You really want to minimize if not practically eliminate stalls. With batching you only need 7 + 12us or so no matter how many ops and tasks were scheduled in the context of that run-loop iteration.
Please stay safe, stay inside, and help those in need if you can.