This patchset adds std.Io and provides two implementations for it:
- std.Io.Threaded - based on a thread pool.
-fno-single-threaded- supports concurrency and cancellation.-fsingle-threaded- does not support concurrency or cancellation.
- std.Io.Evented - work-in-progress, experimental. This API is not ready to be used yet, but it serves to inform the evolution of
std.IoAPI.- IoUring implementation for Linux proof of concept. This backend has really nice properties but it's not finished yet.
- KQueue implementation, implemented enough to prove the concept, including fixing a common bug in other async runtimes.
std.Io.Threaded has networking and file-system operations implemented.
Cancellation works beautifully, except for a known race condition that has a
couple of competing solutions already in mind.
All of std.net has been deleted in favor of std.Io.net.
std.fs has been partially updated to use std.Io - only as required so that
std.Io.Writer.sendFile could use *std.Io.File.Reader rather than
*std.fs.File.Reader.
closes #8224
Laundry List of Io Features
async/await- these primitives express that operations can be done
independently, making them infallible and support execution on limited Io
implementations that lack a concurrency mechanism.concurrent- same asasyncexcept communicates that the operation
must be done concurrently for correctness. Requires memory allocation.cancel- equivalent toawaitexcept also requests the Io implementation
to interrupt the operation and returnerror.Canceled.std.Io.Threaded
supports cancellation by sending a signal to a thread, causing blocking
syscalls to returnEINTR, giving a chance to notice the cancellation request.select- API for blocking on multiple futures usingswitchsyntaxGroup- efficiently manages many async tasks. Supports waiting for and
cancelling all tasks in the group together.Queue(T)- Many producer, many consumer, thread-safe, runtime configurable
buffer size. When buffer is empty, consumers suspend and are resumed by
producers. When buffer is full, producers suspend and are resumed by consumers.- Avoids code bloat using a type safe wrapper around
TypeErasedQueue.
- Avoids code bloat using a type safe wrapper around
Select- for blocking on runtime-known number of tasks and handling a
subset of them.Clock,Duration,Timestamp,Timeout- type safety for units of measurementMutex,Condition- synchronization primitives
Demo
Here is an example that makes an HTTP request to a domain:
const std = @import("std"); const assert = std.debug.assert; const Allocator = std.mem.Allocator; const Io = std.Io; const HostName = std.Io.net.HostName; pub fn main() !void { var debug_allocator: std.heap.DebugAllocator(.{}) = .init; const gpa = debug_allocator.allocator(); var threaded: std.Io.Threaded = .init(gpa); defer threaded.deinit(); const io = threaded.io(); const args = try std.process.argsAlloc(gpa); const host_name: HostName = try .init(args[1]); var http_client: std.http.Client = .{ .allocator = gpa, .io = io }; defer http_client.deinit(); var request = try http_client.request(.HEAD, .{ .scheme = "http", .host = .{ .percent_encoded = host_name.bytes }, .port = 80, .path = .{ .percent_encoded = "/" }, }, .{}); defer request.deinit(); try request.sendBodiless(); var redirect_buffer: [1024]u8 = undefined; var response = try request.receiveHead(&redirect_buffer); std.log.info("received {d} {s}", .{ response.head.status, response.head.reason }); }
Thanks to the fact that networking is now taking advantage of the new std.Io interface,
this code has the following properties:
- It asynchronously sends out DNS queries to each configured nameserver
- As each response comes in, it immediately, asynchronously tries to TCP connect to the
returned IP address. - Upon the first successful TCP connection, all other in-flight connection
attempts are canceled, including DNS queries. - The code also works when compiled with
-fsingle-threadedeven though the
operations happen sequentially. - No heap allocation.
You can see how this is implemented in std.Io.net.HostName.connect:
pub fn connect( host_name: HostName, io: Io, port: u16, options: IpAddress.ConnectOptions, ) ConnectError!Stream { var connect_many_buffer: [32]ConnectManyResult = undefined; var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer); var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options }); var saw_end = false; defer { connect_many.cancel(io); if (!saw_end) while (true) switch (connect_many_queue.getOneUncancelable(io)) { .connection => |loser| if (loser) |s| s.closeConst(io) else |_| continue, .end => break, }; } var aggregate_error: ConnectError = error.UnknownHostName; while (connect_many_queue.getOne(io)) |result| switch (result) { .connection => |connection| if (connection) |stream| return stream else |err| switch (err) { error.SystemResources, error.OptionUnsupported, error.ProcessFdQuotaExceeded, error.SystemFdQuotaExceeded, error.Canceled, => |e| return e, error.WouldBlock => return error.Unexpected, else => |e| aggregate_error = e, }, .end => |end| { saw_end = true; try end; return aggregate_error; }, } else |err| switch (err) { error.Canceled => |e| return e, } } pub const ConnectManyResult = union(enum) { connection: IpAddress.ConnectError!Stream, end: ConnectError!void, }; /// Asynchronously establishes a connection to all IP addresses associated with /// a host name, adding them to a results queue upon completion. pub fn connectMany( host_name: HostName, io: Io, port: u16, results: *Io.Queue(ConnectManyResult), options: IpAddress.ConnectOptions, ) void { var canonical_name_buffer: [max_len]u8 = undefined; var lookup_buffer: [32]HostName.LookupResult = undefined; var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer); host_name.lookup(io, &lookup_queue, .{ .port = port, .canonical_name_buffer = &canonical_name_buffer, }); var group: Io.Group = .init; while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) { .address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }), .canonical_name => continue, .end => |lookup_result| { group.waitUncancelable(io); results.putOneUncancelable(io, .{ .end = lookup_result }); return; }, } else |err| switch (err) { error.Canceled => |e| { group.cancel(io); results.putOneUncancelable(io, .{ .end = e }); }, } }
Upgrade Guide
Missing io Parameter
If you need an io parameter, and you don't have one, you can get one like this:
var threaded: Io.Threaded = .init_single_threaded; const io = threaded.io();
This is legal as long as these functions are not called:
Io.VTable.concurrent
This is a non-ideal workaround - like reaching for std.heap.page_allocator when
you need an Allocator and do not have one. Instead, it is better to accept an
Io parameter if you need one (or store one on a context struct for convenience).
Point is that the application's main function should generally be responsible for
constructing the Io instance used throughout.
When you're testing you can use std.testing.io (much like std.testing.allocator).
How to use async/await
Use this pattern to avoid resource leaks and handle cancellation gracefully:
var foo_future = io.async(foo, .{args}); defer if (foo_future.cancel(io)) |resource| resource.deinit() else |_| {} var bar_future = io.async(bar, .{args}); defer if (bar_future.cancel(io)) |resource| resource.deinit() else |_| {} const foo_result = try foo_future.await(io); const bar_result = try bar_future.await(io);
If the foo or bar function does not return a resource that must be freed, then the if can be simplified to _ = foo() catch {}, and if the function returns void, then the discard can also be removed. The cancel is necessary however because it releases the async task resource when errors (including error.Canceled) are returned.
Related
- juicy main #24510
- Proposal: restricted function types #23367
- Proposal: stackless coroutines as low-level primitives #23446
- eliminate stack overflow #1639
- Builtin function to tell you the maximum stack size of a given function #157
Followup Issues
- Move Filesystem APIs to std.Io #25738
- std.Io.Threaded: handle systems that don't have preadv #25739
- std.Io.Threaded: glibc implementation of
netLookup#25740 - std.Io.Threaded: OpenBSD implementation of
netLookup#25741 - std.Io.Threaded: FreeBSD implementation of
netLookup#25742 - std.Io.Threaded: Darwin implementation of
netLookup#25743 - std.Io.net: non-concurrent versions of
HostName.lookupandHostName.connect#25744 - std.Io.Threaded: implement netInterfaceName #25745
- std.Io.Threaded: complete Windows implementation of netSendOne #25746
- std.Io.Threaded: implement netConnect with timeout #25747
- std.Io.Threaded: when concurrent tasks caused pool to be oversize, reduce pool down to cpu count upon idling #25748
- std.Io.Threaded: eliminate dependency on std.Thread (Mutex, Condition, maybe more) #25749
- std.Io.Threaded: move
max_iovecs_lentoIo#25750 - std.Io.Threaded cancellation race condition (signal received between
checkCanceland syscall) #25751 - migrate
std.process.ChildAPI intostd.Io#25752 - std: eliminate Io.poll #25753
- Eliminate the POSIX API layer #6600
- look into TCP fastopen and TCP_QUICKACK #14173
- std.Io.Threaded: audit
writeResolutionQuery#25754 - std.Io: add a vtable function and interface API for getting size of file only #25755
- std.testing: make tmpDir more efficient #25756
- std.Io.Threaded: improve efficiency with threadlocal run queues and work stealing #25757
- std.Io.Threaded: implement Mutex and Condition #25760
- ARM 32-bit test failure: http.test.test.trailers #25762
- add non-blocking flag to net and fs operations, handle EAGAIN
- Threaded: in netLookup, potentially add diagnostics to the output in several places