Add `Flux.unfold` by asakaev · Pull Request #3897 · reactor/reactor-core

2 min read Original article ↗

Thank you for considering my proposal! I'm excited to discuss how this aligns with the existing API.

Functional approach

The current SynchronousSink<T> API, while effective, operates at a low level and encourages side effects. In contrast, using a pure function S -> (T, S) eliminates these risks and offers a more reliable way to model state transitions.

The proposed unfold operator introduces a declarative approach to codata definition, where each step produces the next value and state. This leads to clearer, more maintainable code:

// evens [0, 2, 4, 6, ...]
Flux.unfold(0, s -> Optional.of(Tuples.of(s, s + 2)));

// fib [0, 1, 1, 2, 3, 5, 8, ...]
Flux.unfold(Tuples.of(0, 1), s -> {
    var a = s.getT1();
    var b = s.getT2();
    return Optional.of(Tuples.of(a, Tuples.of(b, a + b)));
});

// countdown [3, 2, 1]
Flux.unfold(3, s -> s == 0 ? Optional.empty() : Optional.of(Tuples.of(s, s - 1)));

Asynchronous variant

State-of-the-art libraries like fs2 and zio-streams have set the pattern with unfoldEval and unfoldZIO. In our case, an API like this would be a logical extension:

abstract <A, S> Flux<A> unfoldMono(S init, Function<S, Mono<Optional<Tuple2<A, S>>>> f);

However, this is beyond the scope of this PR, as Flux.generate requires immediate state calculation, which doesn't align with asynchronous Mono.

Next steps

If the proposal is interesting, I’ll add documentation and tests. For a deeper theoretical foundation, I recommend Conal Elliott’s talk on folds and unfolds.

Thank you again for your consideration!