Press enter or click to view image in full size
At Strava, we love maps — some of our most loved features are nestled on map surfaces. My team, the Geo team, is focused on building and improving these products. On the Geo and Metro teams, we tend to work with large datasets: aggregations of open source map data via OpenStreetMaps, GPS data points from uploaded activities, third-party datasets for properties like elevation, and beyond. This aggregated dataset eventually turns into Geo features we know and love, like the global heatmap, Strava Metro, the routing product, route suggestions, elevation profiles, and points of interest. We perform these data aggregations in a rather hefty data pipeline, run on a regular cadence to ensure we’re serving up-to-date geo data.
One of the Geo team’s key challenges is efficiently serving large, immutable (write-once, read-many) datasets produced by our pipeline. This is particularly hard for compute-intensive use cases like routing, where:
- Write-Optimized vs. Read-Optimized Conflict: Traditional read-optimized data stores struggle with large batch writes without impacting read performance or introducing significant operational complexity.
- Cost Constraints: Storing rarely accessed datasets in production databases can be prohibitively expensive — especially for projects like Strava Metro, which are accessed sporadically.
- Schema Complexity: Defining schemas externally from the service that uses them can be costly and inflexible for developers.
Previously
Our previous solution to large writes used a combination of datastores: PalDB and Cassandra.
PalDB is a binary data format ideal for small datasets. The README states that it is optimized for “side data”, relatively small datasets you read “in passing” on your service. PalDB is unideal, however, for larger datasets. In our case, since each service deployment required downloading the key-value file from S3, our deployments were taking upwards of twenty minutes. Slow deployment hindered rapid iteration for developers. Additionally, the collection of PalDB files occupies a hefty amount of memory on every service instance. For an important service like Routemaster at Strava, this means we end up duplicating the data in memory across potentially dozens of server instances.
Cassandra is advertised as a write-heavy datastore and it can be an effective solution for batch data outputs. Though Strava does still use it as storage for some batch datastores, we can run into problems when we’re replacing the full dataset on a regular cadence. In our experience, Cassandra writes from Spark require babysitting to ensure we don’t hit throttling/network/connection limits. Since our builds are full replacements incorporating worldwide changes to the OpenStreetMaps source, this can cause high load to a production datastore.
Press enter or click to view image in full size
Introducing Rain
We embarked on a journey to improve our immutable batch data updates by creating a new service that acts as a key-value store for any dataset generated in Spark. We call this service Rain, because it distributes data from the cloud onto client services.
At its core, Rain behaves analogously to the cache in your operating system, except on a distributed systems level. Just as the OS loads file blocks into cache, Rain loads subsets of your Spark-output dataset on S3 (“the filesystem”) into Redis (“the L1/L2 cache”). Client services retrieve the datasets via a rich client that calls a service. In code, Rain has three primary components: a Spark writer API, a library for reads, and a Thrift service, shown below:
Press enter or click to view image in full size
What does this structure enable? First, it allows us to tap into the power of Spark distributed write, which is optimized for Parquet outputs. Second, it allows us to perform immutable data hot swap by simply changing the reference path for the dataset, which is provided through an admin interface. Third, it helps us save on data costs by using a LRU (least recently used) Redis cache and using a single distributed datastore rather than a duplicated in-memory store on each service instance.
Writing a Rain table
Strava uses Apache Spark to run our data pipelines and write Rain tables. For datasets we update regularly, we typically schedule refreshes with Apache Airflow. We output the data of a fully refreshed table to a clean prefix in S3 and update a file representing a pointer to the table data to point at this new prefix.
Press enter or click to view image in full size
Rain is responsible for regularly checking if this reference data pointer has been updated to point to new table data. At the Rain service level, we use a Caffeine cache to expire the pointer pointing to the old dataset. This enables us to replace the immutable dataset in the background, without redeploying any component of the system. The table pointer update is simply an API call to the Rain server. The server then updates the pointer. When Caffeine expires the pointer at the old dataset, Rain will start reading data from the new dataset.
Polymorphism
Key/value pairs in a Rain table can consist of any arbitrary Kryo-serializable case classes. This means that services can be responsible for defining their own schemas, without involving a database administrator. We write, store, and transfer data using a byte array format, and clients define the key/value types and ensure they are Kryo serializable. The Spark client and client server reader handle object serialization and deserialization, respectively.
The client library is responsible for deserializing the byte array results that Rain returns into types that the client can use.
How do we perform schema changes on a Rain table? A schema change generally requires a service deployment, so simply naming and using a new Rain table and redeploying the service will suffice.
Partitioning
“Writing a Rain table” is simply writing homogeneously sized Parquet files in S3, where the schema for the Parquet data is a key-value tuple. We use the basic Spark parquet write API to perform this operation. To write these homogeneously-sized files, we need to assign a partitioner in Spark to conduct a `.repartition` operation.
We partition by key, using either a hash partitioner against the key, or a range partitioner based on a predefined Ordering of the key. Hash partitioning distributes the keys evenly across all partitions based on a hash function. Range partitioning ensures that keys “close” to each other based on the ordering are on the same partition. Hash partitioning offers a guarantee of minimized skew, while range partitioning has cache locality benefits for certain read patterns on the read side, which can reduce the number of read queries.
For some datasets, range partitioning makes more sense because the data is often loaded in ranges. For example, routing edges in Strava’s routebuilder, which are keyed by ID, have IDs close together when close together geographically. This makes the routing edges dataset a good candidate for range partitioning. The flip side of this is that certain keys may be “hotter” than others at read time. The performance penalty of a hot key, however, is fairly minimal with a cache store like Redis.
The Spark write client and the Rain read client need to use the same partitioner to be in sync. For hash-partitioned tables, this is fairly straightforward–we use the default hash partitioner in Spark. However, for range-partitioned tables, we need to write a separate index that the read client loads to tell the Rain server in which partition to search for a key. Since deserialization happens on the client via the rich Rain client library, computing the ordering also needs to happen on the client. That adds one more responsibility when registering classes with the Rain client library: it has to define an ordering for the keys.
Reading
Reading from Rain is fairly straightforward. The client library takes the object key, serializes it into bytes, and makes a request to the Rain service to look up in Redis/S3. If the key is found in Redis, we return it. If not, we load the file from S3 that contains that key. The data is returned in byte array format, and again the Rain client deserializes the bytes into an object that makes sense for the calling service.
Internally, there are a few bookkeeping constructs to ensure good performance for concurrent reads. If there are concurrent requests for the same key, we use ThriftMux consistent hashing to make sure that we only load the S3 file into Redis once. Consistent hashing ensures that only one service instance is responsible for loading the S3 file associated with a key. We use a singleton Caffeine cache on individual service instances to maintain a record of which requests to S3 are currently being made. Conveniently, an AsyncCaffeineCache implicitly provides a clean locking mechanism for our file loads.
We do bookkeeping on Rain server so we can lazily refresh full immutable tables.
Press enter or click to view image in full size
The Cache
There are several levels to the cache in Rain. S3 is the slowest read, followed by Redis, followed by Caffeine cache directly on the service client. We ideally want to have as much data as possible as close to the service client as possible, but, of course, we operate in a world with limited resources. Each service has its own SLA and resourcing requirements. Rain is designed to be flexible to each of these constraints.
As with any caching system, heterogeneity in read patterns on Rain helps provide good performance. The ideal distribution of data stored in a Rain table will have both frequently accessed and infrequently accessed data. Data stored in ordered and partitioned tables helps increase diversity of access SLAs, since partitions in an ordered table will be accessed less frequently. For example, routing data on roads where it is currently nighttime (and therefore not many users) does not need to be cached. Another example: we generally should not need to cache satellite imagery of zoomed-in parts of the ocean (and we expect that data to be evicted first), whereas it would make sense to store in cache an image of Central Park. Variation of SLAs on data retrieval within a cache reduces the amount of cache evictions.
Basically, we can happily exceed the storage size of the cache as long as we don’t have high-usage data filling the whole cache and causing eviction churn.
Local mode
Rain unlocks the ability to run local servers against production datasets. We can set up local runs of service instances to point at a Rain server. Practically, this means we can avoid loading the entirety of a dataset into memory for local runs, meaning local service instances come up much more quickly.
When running in local mode, Strava engineers can pick and choose which Rain tables they want to load from the remote Rain service, and which Rain tables they want to interact with directly. We introduced a Rain table mode to use Caffeine for local caching only. One use case of this mode is when developers are locally iterating on a job that outputs to a Rain table. The developer can write to the Rain table locally in a Spark run and read it locally without having to productionalize their dataset. On the Geo team, we use these local Spark runs to output data specific to a certain region of the world, which is helpful for QA when developing new features.
Admin UI
Of course, with a tool like Rain, it’s important to have visibility into what datasets are being used. It’s important to investigate cache metrics and understand data access patterns for certain datasets. We built an internal UI for this that lets us perform immutable table refreshes, investigate access patterns, warm our Redis cache, and clean up our Redis cache.
Below is a screenshot from the admin UI which shows a distribution of the access times of keys by table in Redis.
Press enter or click to view image in full size
Cons
One downside we’ve seen is an increase in cross-AZ network usage, but the increased spend there is offset by the savings from service memory reduction. We had budgeted for small increases in latency, but we soon discovered that Rain and Redis reads caused little to no latency bump in client service responses.
Another downside we found was a challenging migration from synchronous data retrieval to Future-based asynchronous retrieval of Rain table data. Finagle Thrift client responses are always asynchronous, whereas PalDB calls are always synchronous, so our move to Rain meant that several workflows needed to be refactored to use Futures. This required a bit of thread pool tuning and a better understanding of concurrency control on client services. Some algorithms (e.g. A* graph search) that inherently require a large number of serial steps, each needing a data lookup, are inherently hard to migrate.
Provisioning a large Redis cluster has a cost, too. By our calculations, this is still offset by our savings on cloud computing cost.
Conclusion
Rain is still young, but so far, we’ve seen quite a lot of improvements:
- We’ve seen faster iteration time on deployments (from 20 minutes to 1 minute deploy time) and enabled local mode for large datasets. We hope that Rain simplifies development going forward when considering caching solutions at Strava.
- We reduced the memory footprint of some of our largest services by orders of magnitude.
- We saved tens of thousands of dollars in EC2 computing costs.
- We’ve enabled immutable table refresh in the background
- We’ve enabled distributed write for immutable datasets from Spark
Overall, Rain is a big step forward in the way that Strava can serve large datasets. The biggest benefit we get from Rain is that it lets us serve terabyte-scale Spark-derived datasets at low latency. This is huge for our product roadmap, and I’m excited to see what we’ll do with it.
Interested in joining Strava? See the careers page.