1. Summary
At Zendesk we kept running into a pattern of problems with services that were consuming from busy Kafka topics. They weren’t able to reliably keep up with the rate of production, causing delays in job processing.
So we built a new event delivery platform called Event Job Distributor (EJD). It acts as an extension of Kafka topics, providing resilient and predictable event delivery to our backend services.
This is a deep dive into how we arrived at our eventual platform design, and of the hard-earned lessons along the way. I promise it’s a gripping tale that will keep you on the edge of your seat!
2. Kafka (What is it good for?)
We have been using Apache Kafka for over 5 years. Our Kafka clusters are the platform that expose our domain events and entity streams¹ internally. For example, we have a Kafka topic which contains events for every user object update in a partition/region. Our backend infrastructure often relies on these events to trigger application workflows, and update local datasets.
The Kafka event bus is great for decoupling services from their domains, which has led to Kafka and entity streams becoming well adopted by teams here. But as it matured we began to uncover some non-obvious tradeoffs and corner cases.
At Zendesk we prize building software that just works. The reliability of our offering is paramount. We needed to build a platform that would smooth over some of these sharp edges and tradeoffs.
A detour into Kafka architecture
You can skip this section if you are familiar with Kafka. Or you can read up on it here. Otherwise a small refresher below:
Kafka is a distributed log. Each log is partitioned by topic. The topic’s events are all saved and processed in order. Consumers will pull at least one event from a topic, process the event, and periodically commit the latest event processed back to Kafka. This is known as the consumer offset.
Kafka is great at what it does. Creating a persisted, distributed log of events. It’s exceedingly fast. It’s resilient. Great at maintaining order. But for the software that we are building, we needed more than this. We needed to extend the capabilities given to us by Kafka, which is why we’ve built the EJD platform.
2. Streaming and Head Of Line Blocking
Head of line blocking is a term from queuing theory. You might have seen it IRL when shopping for groceries. If there is a store that has many customers and one cashier, when the customers attempt to check out, they’ll have to wait in line for their turn to be served. If the current customer being served is taking too long (payment terminal issues, or just perhaps having a lengthy yarn with the server.) Then every other customer waiting to be served will have to wait for the single server to be free again. In this case, the single customer who is taking too long to be served is at the ‘head’, and blocks the rest of the ‘line’.
Imagine there is a Kafka topic that logs an event for each and every user account update. Let’s call it user-updated. Every time a user updates their email, an event is produced to the user-updated topic. For every user, for every customer. That’s a lot of events. In periods of high usage we can see up to 500 per second.
In our example we have a service called service-z (Z for Zendesk, of course). Its purpose is to consume each event (in order) from the user-updated topic. Perform some transformation on the event body before forwarding it onward to another service. Finally, it marks the event as processed (committed).
Press enter or click to view image in full size
What happens when the number of events that are published (arrival rate) to Kafka is larger than the number of events that our consumer can pull (service rate) over a window of time?
Well, the Kafka log will handily allow the producer to run ahead of the lagging consumer. The Kafka cluster will track the most recent event produced to the topic, whilst also tracking the last event that our consumer has processed (AKA the offset.) The difference between these two can be described as the ‘consumer lag’.
Press enter or click to view image in full size
When the consumer lag begins to grow, the end-to-end latency of an event being consumed will increase. If your system doesn’t rely on maintaining a low processing time, this is fine. You can use Kafka to keep track of all of the events not committed and pull them off when the service nodes have capacity.
Press enter or click to view image in full size
But, in our use case we do care about the event latency. We can’t have events dwelling for minutes inside of Kafka. What can we do about this? If you are familiar with Kafka you might consider adding extra topic partitions and increasing the amount of service workers. However, adding extra partitions is expensive: it comes at the cost of increased Kafka complexity, and so most advise against increasing the partition count into the thousands. You could also increase the CPU/memory resources of the host machine to help with the processing of Kafka events. However both of these options have upper bounds to the amount the lever can be pulled. We needed something that would flexibly scale as our data load increased.
Press enter or click to view image in full size
Our first pass at a solution to this was pretty straightforward, adding a queue (AWS SQS in our case) in-between our Kafka topic and the downstream service. This shifted the backpressure from the Kafka topic onto the queue. But how does this help with decreasing the latency? When there are more events in the queue than our current processing capacity allows, we can horizontally scale the consumer worker dynamically². Without the same upper bound on the number of workers. As SQS can have over 100,000 events being serviced at a given time.
Should be good right? Well interestingly, this design opens us up to other issues. Enter …
3. The Noisy Neighbour Effect
Now we have a queue that contains all of our events waiting to be serviced, and a flexible/dynamic pool of workers able to respond to the depth of the queue. Naturally we have monitoring on the number of events in the queue and we began to notice odd periods where there were significantly higher levels of events entering the queue. Like, orders of magnitude larger. Colour our responding engineers intrigued. We worked out that occasionally, Zendesk customers would perform a large number of actions on their entities at a single point in time. For example imagine that an organisation tried to update all of its existing users to have a new postcode field. Or perhaps the admins are attempting to import a large amount of external data into one of our custom objects. Understandably, this will flood the user-updated topic and the queue with events from this operation. We also noted that this behaviour was quite spiky. The traffic wasn’t consistently high. It typically would subside after an hour or so, once all the events had been processed³.
As a result the bulk of the events in a queue were from a single customer’s workload. This customer was statistically occupying more of our workers’ service capacity. Resulting in other, perhaps smaller, customers being neglected. Our customers were not being serviced stochastically: there was no fairness to the manner of delivery. This is often called the noisy neighbour effect.
Press enter or click to view image in full size
In trying to solve this problem we came across an excellent article from AWS on queue multitenancy. This informed a lot of how we built our platform and is well worth a read!
After considering a few designs (separate queues per customer, load balancing events across multiple queues) we settled on a heuristic approach. EJD uses an adapted token bucket algorithm where it counts the number of events each customer has enqueued. When the number of events per time interval surpasses a certain threshold it sidelines their traffic to a secondary queue. It will continue to place all of the events into a ‘noisy’ queue until the number of tokens in the bucket has regenerated, and their traffic has subsided to a nominal level. At that point, it’ll move the traffic back into the regular queue.
Press enter or click to view image in full size
The effect of this is when a customer’s activity and usage spikes up beyond a nominal level (flooding the queues and saturating the capacity of the service nodes). The noisy tenant will then be moved into a sideline queue. Allowing the remaining customers to continue using the regular queue without causing resource contention in the workers. Ideally letting their workloads be serviced in a consistent manner and avoiding those pesky spikes in latency. It is possible that multiple tenants could be moved to the sideline queue at the same time. However, it is unlikely that this would impact one tenant disproportionately⁴.
This has the advantage of normalising the latency of events as they move through EJD. The cost is that we need to maintain a separate pool of workers for the sideline queue.
If we return to our store analogy. This would be somewhat equivalent to how some grocery stores have two groups of servers: one for shoppers with less than ten items, and one for customers with more items. This allows customers with only a few items to not be blocked by customers who are buying supplies for the whole week. I acknowledge this isn’t a perfect analogy: there is some subtlety missed, but I think it’s helpful nonetheless.
Press enter or click to view image in full size
4. Asynchronous Event Retries
The first application that was built to use EJD was our revitalized outbound webhooks integration. Zendesk Support customers can configure and trigger payloads to be sent to a HTTP endpoint based on system events. If interested, you can read more here. From the example above, our users want to receive a HTTP request containing updates to a user object whenever the entity is altered.
Get Jesse Cameron’s stories in your inbox
Join Medium for free to get updates from this writer.
The EJD platform isn’t responsible for the last mile of the network requests to our customers. Instead, it defers the construction and dispatching of the network request to a downstream service. Once an event has progressed through our queues, it reaches one of our worker nodes and we dispatch this over gRPC to our consumer service⁵. Our platform is more or less agnostic to the event and its intended purpose. Put simply, it exists as a transport.
Our customers expect that webhooks are always delivered. Therefore, EJD can’t let events be lost, deleted, ignored, or dropped. It is of utmost importance for our webhooks to always be given a chance to succeed. When an event fails, it’s reasonable for the EJD platform to attempt to retry this event. There are a number of causes that could result in a webhook request intermittently failing such as packet loss, or brief outages⁶.
In a synchronous system you can hold on to a request in memory, and retry after some short wait time. However, with our asynchronous system we don’t retry immediately, instead opting to retry after at least a second. Holding a request in memory for this long can come at a cost. For performance and utilisation reasons, it’s important that our service nodes are occupied with the processing of events from the queue, and then delivering them to other systems. It’s expensive to have service nodes waiting a few seconds just for another chance to deliver an event when it could have potentially delivered tens of events in that time period. This can compound quickly. Our solution was to re-enqueue the same payload into our queues with a small delay via Amazon SQS Message Timers. This means that our queues shoulder the work of waiting for another change to retry an event, rather than the server thread itself.
5. Circuit Breaking
There are some cases where we acknowledge that it would be a waste of our resources to try delivering an event. With webhooks, we are attempting to deliver message payloads to external customer web servers. There is a wide surface area for configuration and synchronisation drift between the outgoing request and the accepting server, which can result in the webhook failing or simply not being delivered. When these errors do happen, they typically can only be resolved by the external service owner addressing the issue. So the default strategy of retrying 3–5 times would not end with a successful request. To combat this, EJD provides circuit breaking for these events. If a downstream consumer service is constantly reporting failure of their workloads. We’ll ‘break’ the circuit and not attempt delivery for some time.
Press enter or click to view image in full size
The algorithm for circuit breaking is inspired by the hystrix rate limiter. An interesting challenge we had was adapting this to share failures and success numbers across multiple distributed service nodes (as opposed to the common approach of tracking this only for a single instance⁷.) As all service nodes can receive events from any customer’s workloads. A Redis sorted set using event timestamps as the scores was suitable for maintaining a record of successes and failures.
Our design placed the circuit breaker just before the request is dispatched, ‘after’ the event has dwelled (sat in the queue before being serviced) in the queue. Why not circuit break prior to placing events into the queue (like we do with traffic sidelining)? It’s so the system can accommodate for when the dwell time inside of the queue is larger than expected. The percentage of events that are deemed successful can fluctuate or shift in under a second if the traffic load is high enough. So it’s most accurate to check if a circuit is open as close to dispatch as possible.
Having a way to circuit break a workload which is consistently failing (and adding extra traffic through retries) goes a long way to reduce unnecessary load on our platform, and our downstream consumers.
6. Request Throttling
So far our system has taken the approach of increasing the compute horizontally in order to reduce the end-to-end latency. By adding more nodes to the pool, we give events more of a chance of being serviced by an available worker. This does mean that at a given time there are more events concurrently being serviced. But just because the EJD platform is able to handle a certain level of concurrent requests doesn’t mean that our downstream services can. We want to avoid situations where we overwhelm the capacity of our users.
EJD offers consumers a way to limit the number of events that are dispatched from our platform. We let them configure the number of requests they want to receive in a specific time period.
Press enter or click to view image in full size
We count the number of outbound requests being sent in Redis (similar to the circuit breaker). If service-z receives an event from the queue and the consumer has exceeded the threshold for that customer, it will be marked as throttled, and placed into another separate queue. This queue will be serviced by a separate, smaller, group of workers. As there are less workers in this pool, the rate of processing events will be lower, giving events in the main queue more of a chance of getting delivered.
7. Platform Vision
We made the conscious decision early in the product’s life to make sure that EJD would be a platform that is going to power other applications. From this article, you can see that there is a lot involved in building quality into delivery. It would be expensive for every team to build and maintain their own stack. Instead, the EJD team can do it on their behalf: consumers can then treat our systems as a blackbox that simply ensures event delivery. There is the added benefit of when we upgrade an aspect of our service, it will be provided for all consumers without any extra work. What are some of the expansions that we are thinking about?
One idea that tends to often percolates in our discussions is giving the platform the ability to re-process events that occurred in the past. Imagine that a customer’s webhook server suffered from an outage overnight. Wouldn’t it be great to be able to replay all of those events that were missed during the outage?
We would also like to dip into providing a better attempt at ordering events as a event moves through EJD. Ordering is a complex topic which has plenty written on it. But the short story is that we may potentially be able to use SQS FIFO offerings to give us a slightly stronger ordering guarantee. This would come at the cost of event retries, and potentially queue dwell times.
In our design you can see that we have been somewhat glib with the potential multiple deliveries of the same event. This is a natural trade-off that results from using retries. However, some of our consumers can’t tolerate a event being processed twice. To help our consumers deal with this issue, we could hash the contents of the event and provide that to the consumer. Our consumers can then store a list of the processed hashes and ignore any hash it’s seen before.
Key to the success of our platform is giving our users the ability to observe and audit their workloads after processing. EJD provides automatic logging of all delivery attempts, failures and successes, which can then be extracted via an API. It’s quite primitive and in the future we’d want to look at providing features like stat aggregation, and pushing errors back to our consumers.
8. Conclusion
This article outlines just some of the functionality needed to build a robust event delivery platform. We found the same pattern of applications needing a certain level of delivery quality from event streams. There is enough complexity to justify the creation of a platform that offers these out of the box. Giving our consumers all of these operational functionalities (throttling, circuit breaking, and traffic fairness) allows the application developers to focus on building the core of their application logic. This does come with a trade-off in terms of a slight latency gain — somewhere in the range 100 to 200ms — and a relaxation of delivery guarantees. But in talking with our internal stakeholders, this is a tradeoff we are confident in making.
I’m hoping that this post has painted a clear picture of the systems, and tradeoffs that led to us building our new event delivery platform. And that you’ve learnt from some of our hard earned lessons 💪
Glossary
Queue
Queues are a crucial lego piece to the lego building that is EJD. It’s an abstraction used to describe a temporary datastore where services can push events onto the queue for other services to pull off. Once an event is pulled off it is deleted from the queue.
Events
A payload of data (serialised in json or protobuf) that is ingested into EJD and delivered to our downstream consumers
Noisy Neighbour Effect
When a single tenant/user in a multi-tenant system has a load/usage profile that impacts other tenants
Dwelling
When an event has been “pushed” to a queue but not yet “popped” and serviced.
Saturation
Used to describe compute nodes are fully consumed with a workload. To the point that it cannot service any arriving events.
Throttling
Restricting the number of events that are being serviced in our system so that we can protect downstream services.
Footnotes
[1] At Zendesk we use Entity Streams to describe compacted Kafka topics. Where the contents of the compacted event is the most recent state of an object/entity
[2] When Kafka consumer groups increase their node count it can introduce a small rebalancing latency.
[3] And often these spikes would happen exactly on the hour. Almost as if it was triggered by a cron!
[4] Initially we also discussed having a secondary sideline queue. In case there was enough traffic in the first sideline queue that one or more customers needed to be isolated again. But in practice we haven’t found the congestion periods to overlap enough to warrant this. Perhaps we’ll revisit this in the future though.
[5] At Zendesk, we use protobuf descriptors for our kafka schemas. Which makes using protobuf + gRPC for our interservice requests a natural fit.
[6] But some errors are unlikely to succeed on subsequent retries (think, missing authentication, invalid request formatting). So we let our consumers tell us which events are worth retrying.
[7] The reason for this divergence is that we need to be able to apply limits and restrictions as the events exit EJD. Not as they enter the consumer’s service. A similar principle applies to our throttling approach.
Thank you to everyone that helped revise and edit this paper ❤️