Settings

Theme

Parallel streaming in Haskell: Part 1 – Fast, efficient, and fun

channable.com

92 points by rkrzr 3 years ago · 19 comments

Reader

WJW 3 years ago

I enjoy programming in conduits but somehow never come across a big enough problem to really make use of them. They encapsulate a whole class of problems in a very comprehensible and modular way. The higher order conduits like `scanlC` and friends also enable more complicated ideas than just the usual `map` and `reduce` variants you usually see. It's quite straightforward to model all sorts of chat-/websocket-/trading-bots as conduits for example, I wrote a blog post about it at https://wjwh.eu/posts/2022-05-06-haskell-conduit-bots.html.

Btw @rkrzr I hope you don't mind me asking but "niet geschoten is altijd mis": are you looking for freelance assistance at the moment at all? I've been consistently impressed by the engineering work done at Channable, for example the Aho-Corasick work and the stuff relating to compact regions, but am not able to commit to fulltime work at the moment. Lately I've been mostly working with Ruby and DBA/db performance stuff, but Haskell is the first language I got properly good at and it will always have a warm place in my heart. I'm pretty experienced at the intersection between backend dev and DBA/devops type problems and if you are experiencing any problems in that area I would love to help out there. Looking at the channable github page, I just realized I even made made some PRs for Icepeak way back in 2020. :)

  • valcron1000 3 years ago

    You don't need to use them in "big enough problems". I've used them with great success in a microservice architecture: had to consume some events, do some processing and spit out some data into Kafka. Absolute straightforward code, I could just focus on the "processing" part and ignore the boring plumbing. I'm sure that a lot of apps fit this pattern really well: consume data, processing, spit new data.

  • rkrzrOP 3 years ago

    Thanks for the kind words @WJW, we are pretty proud of our engineering culture as well :)

    We are unfortunately not looking for freelance assistance at the moment, but we regularly have Haskell Engineering positions open, so perhaps that will be interesting to you in the future.

    • WJW 3 years ago

      That's too bad but understandable in this economy. I did notice the open position on the careers page, but long term employment does not suit my situation at the moment. Best of luck and keep me in mind if the situation changes! :)

eikenberry 3 years ago

Nice to see more articles on data flow techniques. They deserve way more attention as a great whole program model for all sorts of problem domains.

still_grokking 3 years ago

Do this streams have any notion of back pressure? I can't find info on that.

For real world distributed systems back pressure is very important. Otherwise you could DoS yourself by some cascading overload.

  • WJW 3 years ago

    A conduit pulls information from earlier conduits in the same pipeline, only requesting the next item when it needs to do more work in order to process a request for a value from a conduit later in the pipeline. This means that inside the conduit itself there is perfect backpressure: you cannot overload any buffers between conduits because there aren't any.

    Of course, depending on the rest of the system you could still have a bottleneck somewhere. For example: if you pull jobs from a queue in (say) Redis but don't have enough capacity to process jobs faster than they are enqueued, the queue will eventually fill up.

    • yoricksijsling 3 years ago

      Precisely. Conduits basically do everything on demand.

      For parallel processing the backpressure becomes a bit more complicated, in particular if you end with a sequential consumer. If you're just doing your parallel tasks when a sequential consumer demands it, you'll find yourself still doing one task at a time. The straightforward solution is to work ahead a bit, spawning just enough parallel work to fill a small buffer that the consumer can read from. You get backpressure this way, but may have done a little bit too much if the consumer doesn't want the rest of the data. We'll show this in the third blog post of the series.

      • still_grokking 3 years ago

        > We'll show this in the third blog post of the series.

        Thanks for the pointer! Will have a look.

    • still_grokking 3 years ago

      Thanks for the explanation!

      So it's pull based and not push based like most other streams lib.

      Does maybe someone know how this compares to FS2 or Iteratees than? (Both are also pull based streaming solutions).

      https://fs2.io/#/

      https://en.wikipedia.org/wiki/Iteratee

      Looks quite similar to me. Is the Scala FS2 lib maybe even a clones of the Haskell solution? Or are they different in important aspects?

agumonkey 3 years ago

Fantastic presentation of this side of FP/Haskell I never got to play with. Many thanks.

philsnow 3 years ago

There are conduit actions (like dedup, sort) that require all the previous action's output to be ready and available before they can start producing any output. All the discussion of GC-friendliness of caches makes me think that there aren't any conduits that have more data than can fit in memory on a single machine.

If you had conduits that were too big to fit in memory at once, would you (channable) stream them to local disk (either explicitly or just using virtual memory)? Or would you be able to distribute work between multiple machines with a cluster-aware Conduit type?

Your scheduler could split the job up into multiple machines and run the same Conduit pipeline on all the machines, and only the conduit steps that need to communicate with each other would do so.

Separate question, do you have any actions that produce more output than their input? I could imagine some customers might find it useful to generate the cartesian product of two inputs, or the power set of one input.

  • yoricksijsling 3 years ago

    > All the discussion of GC-friendliness of caches makes me think that there aren't any conduits that have more data than can fit in memory on a single machine.

    Yep! That's currently still the case. We do have some ideas to put caches on disk using mmapped files, so that you have fast access when it all fits in memory but the OS can also drop them when it wants to.

    For the moment we just use instances with 128GB memory, and those can still fit the datasets of the biggest customers that we have right now. Datasets go up to 10s of GBs, so it's not really 'big data' that needs to be distributed across machines. Due to the regular occurrence of aggregations (sort/group/window/deduplicate) that require at least _some_ synchronization, we only have small sections that can be completely parallelized. It's already a challenge to use all the cores on a single machine in an efficient manner, and I don't think we'll achieve much by using multiple machines for a single job. We've discussed a lot of this in an earlier blog post here: https://www.channable.com/tech/why-we-decided-to-go-for-the-...

    > Separate question, do you have any actions that produce more output than their input?

    Yep, we don't have cartesian products but we do have a 'split' action. Typical usage might be that a customer has a "sizes" field with values like "M,L,XL" and that they would split on that field so that a single item becomes three items for the separate sizes. The increase in the number of items is usually limited, and the increase in memory usage is even smaller because at most points during the data processing we only store the changed fields, and refer to the original item for the rest. In these cases multiple items will point to the same original item.

  • unhammer 3 years ago

    > makes me think that there aren't any conduits that have more data than can fit in memory on a single machine

    Just because you can dedup a conduit doesn't mean you have to. We use conduits for streaming gigabytes-large files while staying within megabytes of memory use – ensuring that is one of the main selling points of libraries like conduit: https://github.com/snoyberg/conduit#readme

  • philsnow 3 years ago

    found https://utdemir.com/posts/ann-distributed-dataset.html , which makes me think that kind of distributed approach could be workable for channable (the default existing backend spins up lambda workers and stores intermediate results in S3), but you'd need to write a different backend that farms out work to whatever worker machines you have available

joeman1000 3 years ago

Cool! Is this the same thing as streams in SICP?

Keyboard Shortcuts

j
Next item
k
Previous item
o / Enter
Open selected item
?
Show this help
Esc
Close modal / clear selection