Parallel streaming in Haskell: Part 1 – Fast, efficient, and fun
channable.comI enjoy programming in conduits but somehow never come across a big enough problem to really make use of them. They encapsulate a whole class of problems in a very comprehensible and modular way. The higher order conduits like `scanlC` and friends also enable more complicated ideas than just the usual `map` and `reduce` variants you usually see. It's quite straightforward to model all sorts of chat-/websocket-/trading-bots as conduits for example, I wrote a blog post about it at https://wjwh.eu/posts/2022-05-06-haskell-conduit-bots.html.
Btw @rkrzr I hope you don't mind me asking but "niet geschoten is altijd mis": are you looking for freelance assistance at the moment at all? I've been consistently impressed by the engineering work done at Channable, for example the Aho-Corasick work and the stuff relating to compact regions, but am not able to commit to fulltime work at the moment. Lately I've been mostly working with Ruby and DBA/db performance stuff, but Haskell is the first language I got properly good at and it will always have a warm place in my heart. I'm pretty experienced at the intersection between backend dev and DBA/devops type problems and if you are experiencing any problems in that area I would love to help out there. Looking at the channable github page, I just realized I even made made some PRs for Icepeak way back in 2020. :)
You don't need to use them in "big enough problems". I've used them with great success in a microservice architecture: had to consume some events, do some processing and spit out some data into Kafka. Absolute straightforward code, I could just focus on the "processing" part and ignore the boring plumbing. I'm sure that a lot of apps fit this pattern really well: consume data, processing, spit new data.
Thanks for the kind words @WJW, we are pretty proud of our engineering culture as well :)
We are unfortunately not looking for freelance assistance at the moment, but we regularly have Haskell Engineering positions open, so perhaps that will be interesting to you in the future.
That's too bad but understandable in this economy. I did notice the open position on the careers page, but long term employment does not suit my situation at the moment. Best of luck and keep me in mind if the situation changes! :)
Nice to see more articles on data flow techniques. They deserve way more attention as a great whole program model for all sorts of problem domains.
Do this streams have any notion of back pressure? I can't find info on that.
For real world distributed systems back pressure is very important. Otherwise you could DoS yourself by some cascading overload.
A conduit pulls information from earlier conduits in the same pipeline, only requesting the next item when it needs to do more work in order to process a request for a value from a conduit later in the pipeline. This means that inside the conduit itself there is perfect backpressure: you cannot overload any buffers between conduits because there aren't any.
Of course, depending on the rest of the system you could still have a bottleneck somewhere. For example: if you pull jobs from a queue in (say) Redis but don't have enough capacity to process jobs faster than they are enqueued, the queue will eventually fill up.
Precisely. Conduits basically do everything on demand.
For parallel processing the backpressure becomes a bit more complicated, in particular if you end with a sequential consumer. If you're just doing your parallel tasks when a sequential consumer demands it, you'll find yourself still doing one task at a time. The straightforward solution is to work ahead a bit, spawning just enough parallel work to fill a small buffer that the consumer can read from. You get backpressure this way, but may have done a little bit too much if the consumer doesn't want the rest of the data. We'll show this in the third blog post of the series.
> We'll show this in the third blog post of the series.
Thanks for the pointer! Will have a look.
Thanks for the explanation!
So it's pull based and not push based like most other streams lib.
Does maybe someone know how this compares to FS2 or Iteratees than? (Both are also pull based streaming solutions).
https://en.wikipedia.org/wiki/Iteratee
Looks quite similar to me. Is the Scala FS2 lib maybe even a clones of the Haskell solution? Or are they different in important aspects?
"Many of the ideas in FS2 were first tried and developed in there [Haskell]".
Conduit is pretty much a direct descendant of iteratee ideas in Haskell. It started about here, as far as I know: https://hackage.haskell.org/package/iteratee
Fantastic presentation of this side of FP/Haskell I never got to play with. Many thanks.
There are conduit actions (like dedup, sort) that require all the previous action's output to be ready and available before they can start producing any output. All the discussion of GC-friendliness of caches makes me think that there aren't any conduits that have more data than can fit in memory on a single machine.
If you had conduits that were too big to fit in memory at once, would you (channable) stream them to local disk (either explicitly or just using virtual memory)? Or would you be able to distribute work between multiple machines with a cluster-aware Conduit type?
Your scheduler could split the job up into multiple machines and run the same Conduit pipeline on all the machines, and only the conduit steps that need to communicate with each other would do so.
Separate question, do you have any actions that produce more output than their input? I could imagine some customers might find it useful to generate the cartesian product of two inputs, or the power set of one input.
> All the discussion of GC-friendliness of caches makes me think that there aren't any conduits that have more data than can fit in memory on a single machine.
Yep! That's currently still the case. We do have some ideas to put caches on disk using mmapped files, so that you have fast access when it all fits in memory but the OS can also drop them when it wants to.
For the moment we just use instances with 128GB memory, and those can still fit the datasets of the biggest customers that we have right now. Datasets go up to 10s of GBs, so it's not really 'big data' that needs to be distributed across machines. Due to the regular occurrence of aggregations (sort/group/window/deduplicate) that require at least _some_ synchronization, we only have small sections that can be completely parallelized. It's already a challenge to use all the cores on a single machine in an efficient manner, and I don't think we'll achieve much by using multiple machines for a single job. We've discussed a lot of this in an earlier blog post here: https://www.channable.com/tech/why-we-decided-to-go-for-the-...
> Separate question, do you have any actions that produce more output than their input?
Yep, we don't have cartesian products but we do have a 'split' action. Typical usage might be that a customer has a "sizes" field with values like "M,L,XL" and that they would split on that field so that a single item becomes three items for the separate sizes. The increase in the number of items is usually limited, and the increase in memory usage is even smaller because at most points during the data processing we only store the changed fields, and refer to the original item for the rest. In these cases multiple items will point to the same original item.
> makes me think that there aren't any conduits that have more data than can fit in memory on a single machine
Just because you can dedup a conduit doesn't mean you have to. We use conduits for streaming gigabytes-large files while staying within megabytes of memory use – ensuring that is one of the main selling points of libraries like conduit: https://github.com/snoyberg/conduit#readme
found https://utdemir.com/posts/ann-distributed-dataset.html , which makes me think that kind of distributed approach could be workable for channable (the default existing backend spins up lambda workers and stores intermediate results in S3), but you'd need to write a different backend that farms out work to whatever worker machines you have available
The distributed backend part could be done via a lower-level set of distributed primitives from https://hackage.haskell.org/package/distributed-process
Cool! Is this the same thing as streams in SICP?