mpmc package - github.com/jussi-kalliokoski/mpmc - Go Packages

3 min read Original article ↗

Package mpmc provides abstractions for implementing communications where messages are broadcasted to multiple consumers, possibly produced by multiple producers.

This section is empty.

This section is empty.

This section is empty.

type MPMC[T any] struct {
	
}

MPMC is a communication primitive for messages with Multiple Producers and Multiple Consumers. The zero value for an MPMC is ready for publishing and subscribing.

An MPMC must not be copied after first use.

Similar to locks, an MPMC is a duplex intended to be used as an implementation detail and for most use cases the methods should not be as is as a part of an API whose implementation uses MPMC.

Recommended usage is that the publisher(s) own the MPMC and provide a fitting abstraction for subscriptions. For use cases with multiple publishers, it might even be preferable to just provide the Publish method to the publishers, and have the coordinator own the MPMC.

package main

import (
	"context"
	"fmt"

	"github.com/jussi-kalliokoski/mpmc"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	subscribeCtx, subscribeCancel := context.WithCancel(ctx)
	defer subscribeCancel()
	var events mpmc.MPMC[string]
	events.Subscribe(subscribeCtx, func(mctx context.Context, msg string) error {
		// subscriptions must handle the cancelation of their own context,
		// as well as that of the publisher
		select {
		case <-mctx.Done():
			// the message publish context was canceled,
			// return cancellation to stop propagation
			return mctx.Err()
		case <-subscribeCtx.Done():
			// the subscription context was canceled,
			// return nil to allow the next subscriber
			// to be called
			return nil
		default:
			fmt.Println("msg:", msg)
			return nil
		}
	})
	if err := events.Publish(ctx, "abc"); err != nil {
		panic(err)
	}
}
Output:

msg: abc
package main

import (
	"context"
	"fmt"

	"github.com/jussi-kalliokoski/mpmc"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	subscribeCtx, subscribeCancel := context.WithCancel(ctx)
	defer subscribeCancel()
	var events mpmc.MPMC[string]
	waitCancelCh := make(chan struct{})
	events.Subscribe(subscribeCtx, func(mctx context.Context, msg string) error {
		<-waitCancelCh
		// subscriptions must handle the cancelation of their own context,
		// as well as that of the publisher
		select {
		case <-mctx.Done():
			// the message publish context was canceled,
			// return cancellation to stop propagation
			return mctx.Err()
		case <-subscribeCtx.Done():
			// the subscription context was canceled,
			// return nil to allow the next subscriber
			// to be called
			fmt.Println("msg ignored")
			return nil
		default:
			fmt.Println("msg:", msg)
			return nil
		}
	})
	subscribeCancel()
	close(waitCancelCh)
	if err := events.Publish(ctx, "abc"); err != nil {
		panic(err)
	}
}
Output:

msg ignored

Consume behaves like ConsumeAndClose except the consumerCh will not be closed.

This allows for patterns like piping multiple MPMCs to the same consumerCh.

func (*MPMC[T]) ConsumeAndClose

func (mpmc *MPMC[T]) ConsumeAndClose(ctx context.Context, consumerCh chan<- T)

ConsumeAndClose adds a subscriber that pushes to the published messages to consumerCh.

The created subscription will eventually be removed after the provided ctx is canceled.

The provided consumerCh will be closed once the subscription has been removed.

Publish calls all active subscribers with the provided ctx and msg.

If a subscriber returns an error, Publish will not call more subscribers, but instead returns the error from the subscriber.

Subscribe adds the subscriber to the list of functions to be called when a message is published.

The created subscription will eventually be removed after the provided ctx is canceled.

The subscriber MUST gracefully handle incoming messages without deadlocking even if the ctx provided to Subscribe has already been canceled.