I/O Integrated Runtime Concept · typelevel cats-effect · Discussion #3070

12 min read Original article ↗

Okay, so this is kind of a grander and crazier concept, but there's some evidence to suggest it could result in some fairly significant performance benefits, particularly for I/O-bound applications with high RPS, and particularly in tail latencies. See this paper for some empirical justification for this line of work.

Introduction

The present state of the art on the JVM as it relates to asynchronous I/O itself is the approach that Netty and NIO2 take: a separate thread pool (or pools, in Netty's case) which manage the worker threads. The workers in turn are responsible for calling the platform-specific polling function, which is usually one of epoll, kqueue, select, or io_uring.

These functions, implemented by the OS kernel, are the fundamental primitive unit of asynchronous I/O. On a mechanical level, the way that they work is by reading from a process-specific circular buffer within the kernel. This circular buffer contains I/O events for all continuations within that process (filtered at read by some selector, in the case of a few of these functions). The worker thread receives these events and then reassociates them in user-space with the callbacks that were registered for each one individually, then invokes those callbacks, which in turn resumes the fibers corresponding to each one (i.e. completing the async). These functions all have tunable behavior in the event that the kernel circular buffer is empty: they can either return immediately with an empty set of events, or they can block the thread until events are available, or until some timeout is reached.

At face value, it does seem relatively reasonable to devote a separate worker thread or threads to this problem. It ultimately reduces to a blocking I/O operation which multiplexes many distinct operations within a single call, and as we all know, blocking on compute threads is very bad. So in frameworks like Netty and NIO2, these worker threads are responsible for handling the blocking and then immediately shunting work back onto the compute pool by invoking the registered callbacks. Seems like a totally reasonable arrangement, right?

As a minor digression, it's worth pointing out that this is exactly the same as the way that IO.sleep is implemented. Namely, there is a separate scheduler thread which just loops on Thread.sleep (well, technically Unsafe.parkNanos), sleeping for the minimum delay across all registered timers, then dispatching all relevant timers every time it wakes. When it dispatches these timers, it simply completes callbacks which transfer control flow back to the compute pool.

The problem with this paradigm is these event dispatch threads are not free. In fact, if you have a particularly active I/O bus (as in a service which is operating at high RPS), the worker thread will spend relatively little time actually blocking since the polling syscall will almost always return some events which must be dispatched, which in turn means that these dispatch threads need time on the physical CPU, which in turn evicts threads which are managing actual compute, causing page faults and resource contention.

The generally accepted solution to this problem at present is to restrict the event dispatcher threads to a very small number (often just one), and then shrink the compute pool by the same number to "make space" for the event dispatchers on the underlying hardware. Thus, if you have n physical threads and you need k event dispatchers (including both asynchronous I/O workers and timer managers), then you only have n - k compute threads.

Additionally, you have to pay context switch penalties every time you cross an I/O or timer (e.g. sleep) barrier. A fiber which begins on carrier thread A and suspends (using async) waiting for an I/O event will be awakened by the event dispatcher thread, which will invoke the callback, which will go onto the external queue of the runtime, and then can be picked up by thread B, which must then page the fiber working set into its physical cache. All of this round-trips through the kernel scheduler and pays the corresponding synchronization and contention penalties.

To make matters worse, this kind of situation isn't an absolute. The downsizing of the compute pool to n - k workers is required only when the event dispatch threads are fully active and never blocking, corresponding to a situation where I/O events are happening with a frequency which matches the processing rate of the thread itself. This certainly happens in extremely high RPS services, but it is far from a given, and even extremely high scale systems tend to have relative spikes and lulls in traffic. Additionally, some applications are more compute or memory bound, or bound by blocking I/O (usually block filesystems), which can result in indirect throttling of asynchronous activity, in turn reducing the both the necessity and the effectiveness of downsizing the compute pool.

And if this all wasn't bad enough, the consequences of this inefficiency are immensely difficult to measure. Page faults are one of those things that are, almost by definition, not particularly well instrumented on modern platforms. Java Flight Recorder can give you some hints, as can monitoring procfs, employing dtrace and similar kernel introspection, and such, but at the end of the day it is very difficult to get a definitive measurement of how much loss you are suffering. Thus, at the end of the day, most people tend to ignore these costs altogether and just run with a compute pool sized to n, allowing event dispatch threads to heavily contend. I've even recommended exactly this configuration, since it's universally less awful than the alternatives, but that doesn't mean it is optimal.

Refer once again to the paper linked in the first paragraph (which anecdotally lines up almost exactly with some of the work I've done on measuring scheduler contention costs in I/O-heavy applications). The practical losses here may be on the order of 30-50%, particularly in tail latencies (which is how most modern services are scaled), which is not a small amount of inefficiency by any stretch of the imagination.

So... can we do better?

Concept

Yes. :-)

Consider the design of libuv (the runtime which underlies Node.js). In general, most libuv applications only have a single thread which must handle compute tasks, timer management, and asynchronous I/O. This is accomplished by the worker proceeding in the following (simplified) loop:

  1. Enqueue any completed timers
  2. Take the next compute queue task and execute
  3. If compute queue is empty, invoke polling syscall (e.g. epoll), blocking for up timeout milliseconds, where timeout is the minimum of all outstanding timers
  4. If the compute queue is non-empty, invoke polling syscall (e.g. epoll) without blocking
  5. Loop

Recall that polling syscalls, like epoll, can either block for up to some timeout whenever there are no outstanding events, or they can just return immediately. The former is what we see in step 3, while the latter is what we see in step 4.

There is no particular reason why we cannot do this in the Cats Effect runtime. In particular, the Work Stealing Thread Pool already has the bones of this, and the implementation strategy for cooperative polling timers is already very similar to what is outlined above, with the main difference being the use of epoll with a timeout rather than parkNanos. The main modifications to the libuv strategy which will be required for the WSTP will be things along the following line:

  • We probably only want to run the syscall once every n iterations of the worker loop. At present, we poll the external queue once every 64 iterations (going off memory, it might be 128). We certainly wouldn't want to epoll any more often than that unless both the internal and external queues are empty.
  • Since every worker is polling, we would ideally add some extra state management to ensure that workers only poll for I/O events which relate to the fibers which suspended on that worker. This is going to have some very complex interactions with stealing, and my guess is that we will need to allow some race conditions where a worker polls for events related to fibers that were just stolen, and thus ends up indirectly stealing the work back. So long as this doesn't land us in an invalid state, it's okay for the performance to be suboptimal in these edge cases
  • We need to ensure that the native syscall respects JVM thread interruption, possibly using a strategy like this one. The runtime needs to be able to wake up parked worker threads when new work comes in.

Notably, none of this implementable without support from the WSTP worker threads themselves. For example, if you attempted to do this type of thing by wrapping polling syscalls in IO(...), those calls would indeed happen on the compute threads, but you have no way of knowing the local worker state with respect to timers or whether you should make a blocking or non-blocking syscall. To make matters worse, you have no way of "pinning" your polling calls to particular worker threads, which is an essential property of this implementation: each worker must poll independently, and the polling work cannot be stolen by other workers.

Thus, this is something which we need to integrate directly into the runtime itself. However, Cats Effect is explicitly not an I/O framework (despite the name of its primary abstraction), and it has been very successful in fostering an ecosystem that builds on top of it as a concurrency and asynchrony management layer, pushing the I/O syscalls themselves into either an underlying runtime (in the case of Scala.js) or external frameworks (in the case of the JVM).

Unfortunately, with its current set of abstractions, Cats Effect makes it impossible to achieve the kinds of efficiencies that would be theoretically possible if the worker threads performed the I/O polling, precisely because of the issues outlined above with a wrapping IO(...) implementation strategy. The best that can be done by third parties is simply what is already done today: maintain a separate pool of event dispatchers. Thus, if we want to do better in this area, Cats Effect needs to expose some additional primitive or primitives which make it possible to compose this functionality into the core runtime loop.

Proposed Sketch

In theory, this can be done by parameterizing the WorkStealingThreadPool, and in turn, IORuntime itself. Specifically, a type signature like the following would get us close to what is necessary:

def syspoll(timeout: Duration): Array[Event]

A third-party library, downstream of Cats Effect, could implement this function to delegate to epoll/kqueue/io_uring/etc, and in turn, the function could be invoked by the worker threads of the WSTP during their core loop. Duration.Zero would naturally signify "don't block", while Duration.Inf would signify "no timers, block forever".

We can actually take this even further by defining it as part of IOApp, which already performs some IORuntime configuration. Such a downstream library could theoretically define a subtype of IOApp which provides a concrete implementation of this function, and perhaps materializes a Network[IO] (from fs2-io). This kind of implementation would allow frameworks like Ember and Skunk to transparently inherit the improved performance.

To be clear, this hypothetical downstream I/O library would not be mandatory. Users would always be free to use something like NIO2, Netty, Blaze, or anything else. It would simply be a compositional element of the ecosystem.

There are even some preliminary experiments with this concept in the form of Arman's epollcat prototype.

Unfortunately, the above signature isn't quite sufficient. We need some way of fibers signifying that they have suspended specifically for reasons of an asynchronous I/O operation, and that set of fibers from the local worker must be passed to the syspoll function. Its Array[Event] return type would then need to be modified to indicate the subset of those fibers which should be awakened with a particular set of data. These suspensions and even the data being passed around would be specific to the downstream library, meaning that this would need to be done in a fashion which is opaque to the worker thread itself without impairing performance.

I don't have good solutions to this. Yet. :-) It feels more like a "type tetris" problem than a fundamental impediment to the concept.

Prior Art

This really isn't a new idea. Tokio actually does something somewhat similar to this, and continues to represent fertile inspiration for the future of our ecosystem. Obviously, libuv itself is also quite a bit like this, though it doesn't take things to the extent of integrating it into a self-balancing multi-thread scheduler.

Arman's PollingExecutionContext for Scala Native is a relatively limited single-threaded implementation of this type of concept. This original hack to make IO work on Scala Native in the first place was what inspired this line of thinking.

On the JVM itself though, the only framework I'm aware of which even attempts such a tight integration between compute, timers, and asynchronous I/O is Netty. Unfortunately, Netty pipelines are awkward in practice, not particularly compositional, and suffer greatly from issues of compute unit granularity. This means that most people try to get off of the Netty worker pool as quickly as possible to avoid corrupting fairness, which is a problem that Cats Effect already solves quite well. This in turn means that Cats Effect is uniquely positioned to provide this type of functionality, bringing the benefits of this type of thread topology to the JVM in a real way for the first time.

Next Steps

Bikeshed! Come up with crazy ideas! Tell me this is a terrible concept. Let's discuss. :-)