std: Introduce `Io` Interface by andrewrk · Pull Request #25592 · ziglang/zig

7 min read Original article ↗

This patchset adds std.Io and provides two implementations for it:

  • std.Io.Threaded - based on a thread pool.
    • -fno-single-threaded - supports concurrency and cancellation.
    • -fsingle-threaded - does not support concurrency or cancellation.
  • std.Io.Evented - work-in-progress, experimental. This API is not ready to be used yet, but it serves to inform the evolution of std.Io API.
    • IoUring implementation for Linux proof of concept. This backend has really nice properties but it's not finished yet.
    • KQueue implementation, implemented enough to prove the concept, including fixing a common bug in other async runtimes.

std.Io.Threaded has networking and file-system operations implemented.
Cancellation works beautifully, except for a known race condition that has a
couple of competing solutions already in mind.

All of std.net has been deleted in favor of std.Io.net.

std.fs has been partially updated to use std.Io - only as required so that
std.Io.Writer.sendFile could use *std.Io.File.Reader rather than
*std.fs.File.Reader.

closes #8224

Laundry List of Io Features

  • async/await - these primitives express that operations can be done
    independently, making them infallible and support execution on limited Io
    implementations that lack a concurrency mechanism.
  • concurrent - same as async except communicates that the operation
    must be done concurrently for correctness. Requires memory allocation.
  • cancel - equivalent to await except also requests the Io implementation
    to interrupt the operation and return error.Canceled. std.Io.Threaded
    supports cancellation by sending a signal to a thread, causing blocking
    syscalls to return EINTR, giving a chance to notice the cancellation request.
  • select - API for blocking on multiple futures using switch syntax
  • Group - efficiently manages many async tasks. Supports waiting for and
    cancelling all tasks in the group together.
  • Queue(T) - Many producer, many consumer, thread-safe, runtime configurable
    buffer size. When buffer is empty, consumers suspend and are resumed by
    producers. When buffer is full, producers suspend and are resumed by consumers.
    • Avoids code bloat using a type safe wrapper around TypeErasedQueue.
  • Select - for blocking on runtime-known number of tasks and handling a
    subset of them.
  • Clock, Duration, Timestamp, Timeout - type safety for units of measurement
  • Mutex, Condition - synchronization primitives

Demo

Here is an example that makes an HTTP request to a domain:

const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Io = std.Io;
const HostName = std.Io.net.HostName;

pub fn main() !void {
    var debug_allocator: std.heap.DebugAllocator(.{}) = .init;
    const gpa = debug_allocator.allocator();

    var threaded: std.Io.Threaded = .init(gpa);
    defer threaded.deinit();

    const io = threaded.io();

    const args = try std.process.argsAlloc(gpa);

    const host_name: HostName = try .init(args[1]);

    var http_client: std.http.Client = .{ .allocator = gpa, .io = io };
    defer http_client.deinit();

    var request = try http_client.request(.HEAD, .{
        .scheme = "http",
        .host = .{ .percent_encoded = host_name.bytes },
        .port = 80,
        .path = .{ .percent_encoded = "/" },
    }, .{});
    defer request.deinit();

    try request.sendBodiless();

    var redirect_buffer: [1024]u8 = undefined;
    var response = try request.receiveHead(&redirect_buffer);
    std.log.info("received {d} {s}", .{ response.head.status, response.head.reason });
}

Thanks to the fact that networking is now taking advantage of the new std.Io interface,
this code has the following properties:

  • It asynchronously sends out DNS queries to each configured nameserver
  • As each response comes in, it immediately, asynchronously tries to TCP connect to the
    returned IP address.
  • Upon the first successful TCP connection, all other in-flight connection
    attempts are canceled, including DNS queries.
  • The code also works when compiled with -fsingle-threaded even though the
    operations happen sequentially.
  • No heap allocation.

You can see how this is implemented in std.Io.net.HostName.connect:

pub fn connect(
    host_name: HostName,
    io: Io,
    port: u16,
    options: IpAddress.ConnectOptions,
) ConnectError!Stream {
    var connect_many_buffer: [32]ConnectManyResult = undefined;
    var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer);

    var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options });
    var saw_end = false;
    defer {
        connect_many.cancel(io);
        if (!saw_end) while (true) switch (connect_many_queue.getOneUncancelable(io)) {
            .connection => |loser| if (loser) |s| s.closeConst(io) else |_| continue,
            .end => break,
        };
    }

    var aggregate_error: ConnectError = error.UnknownHostName;

    while (connect_many_queue.getOne(io)) |result| switch (result) {
        .connection => |connection| if (connection) |stream| return stream else |err| switch (err) {
            error.SystemResources,
            error.OptionUnsupported,
            error.ProcessFdQuotaExceeded,
            error.SystemFdQuotaExceeded,
            error.Canceled,
            => |e| return e,

            error.WouldBlock => return error.Unexpected,

            else => |e| aggregate_error = e,
        },
        .end => |end| {
            saw_end = true;
            try end;
            return aggregate_error;
        },
    } else |err| switch (err) {
        error.Canceled => |e| return e,
    }
}

pub const ConnectManyResult = union(enum) {
    connection: IpAddress.ConnectError!Stream,
    end: ConnectError!void,
};

/// Asynchronously establishes a connection to all IP addresses associated with
/// a host name, adding them to a results queue upon completion.
pub fn connectMany(
    host_name: HostName,
    io: Io,
    port: u16,
    results: *Io.Queue(ConnectManyResult),
    options: IpAddress.ConnectOptions,
) void {
    var canonical_name_buffer: [max_len]u8 = undefined;
    var lookup_buffer: [32]HostName.LookupResult = undefined;
    var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer);

    host_name.lookup(io, &lookup_queue, .{
        .port = port,
        .canonical_name_buffer = &canonical_name_buffer,
    });

    var group: Io.Group = .init;

    while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) {
        .address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }),
        .canonical_name => continue,
        .end => |lookup_result| {
            group.waitUncancelable(io);
            results.putOneUncancelable(io, .{ .end = lookup_result });
            return;
        },
    } else |err| switch (err) {
        error.Canceled => |e| {
            group.cancel(io);
            results.putOneUncancelable(io, .{ .end = e });
        },
    }
}

Upgrade Guide

Missing io Parameter

If you need an io parameter, and you don't have one, you can get one like this:

var threaded: Io.Threaded = .init_single_threaded;
const io = threaded.io();

This is legal as long as these functions are not called:

  • Io.VTable.concurrent

This is a non-ideal workaround - like reaching for std.heap.page_allocator when
you need an Allocator and do not have one. Instead, it is better to accept an
Io parameter if you need one (or store one on a context struct for convenience).
Point is that the application's main function should generally be responsible for
constructing the Io instance used throughout.

When you're testing you can use std.testing.io (much like std.testing.allocator).

How to use async/await

Use this pattern to avoid resource leaks and handle cancellation gracefully:

var foo_future = io.async(foo, .{args});
defer if (foo_future.cancel(io)) |resource| resource.deinit() else |_| {}

var bar_future = io.async(bar, .{args});
defer if (bar_future.cancel(io)) |resource| resource.deinit() else |_| {}

const foo_result = try foo_future.await(io);
const bar_result = try bar_future.await(io);

If the foo or bar function does not return a resource that must be freed, then the if can be simplified to _ = foo() catch {}, and if the function returns void, then the discard can also be removed. The cancel is necessary however because it releases the async task resource when errors (including error.Canceled) are returned.

Related

Followup Issues