Press enter or click to view image in full size
Overview
At Airtable, we store our application or “base-scoped” data on a number of sharded MySQL instances in Amazon’s Relational Database Service (RDS). Each Airtable base is associated with a single one of these sharded instances, and as the base and the data in the base changes, we store some append-only data associated with the history of the base. This data powers features such as undo and revision history, allowing us to display record revisions as far back as 3 years ago for our enterprise customers. As we have grown as a business, these append-only tables have become increasingly large and their content now represents close to half of all the data that we store at Airtable. Additionally, much of this data is very infrequently accessed, but is stored in the same storage layer as all of our base-scoped data for every day product use making it expensive.
The Project
The Live Shard Data Archive (LSDA) project allowed us to shrink the disc volumes of the bulk of our RDS instances by taking this infrequently accessed, append-only data and storing it in a cheaper storage solution, S3. Once it was stored in that cheaper solution, we were able to drop the old data, and rebuild the RDS instances to reclaim the space.
Moving this data to S3 required three major steps. First, we had to archive and transform the data from RDS into S3 such that it could be accessed by our codebase in an efficient and consistent way. Second, we had to validate that this archived data matched the existing source data in RDS, and that our process of archiving did not cause any inconsistencies between the two datasets. Finally, we had to make application code changes to serve this data from S3. After these steps, we were able to truncate the data from RDS that we were serving from S3, allowing us to shrink the allocated storage space of these instances, saving a substantial amount of our overall RDS bill. This blog post will focus on the first phase of the second step of that process, data validation.
Archiving and Transformation
The data archival from RDS into its final shape in S3 was a three-step process. First, we exported snapshots of our databases into S3 as parquets. This is a built-in RDS feature. To optimize query latency of the archive, we then repartitioned the snapshot exports by our customers. This was a challenging process due to the amount of data (>1PB) and the number of files involved (>10M). We used Apache Flink to incrementally ingest these files and repartitioned them into per customer partitions. Finally, we ran a highly-concurrent Kubernetes rewriter job that sorted the archive for each customer, and added the proper index and bloom filters to the rewritten parquets to speed up the most common query patterns.
Validation overview and considered approaches
Validating the data required us to do a row by row comparison from the archive to our source data and make sure that for every row, these values were equal. Naively, an easy way to do this would be to read a row from our archive, find that row in RDS, and confirm they are the same. However, we were dealing with almost 1PB of data and close to 2 trillion rows. Additionally, our RDS instances in production serve customer traffic, so saddling them with these additional requests was not really an option, especially at the volume we would require to validate our entire archive. As a result, we decided to use the original, unmodified RDS export as our source data for this validation project. This data was stored in S3, and while that alleviated the problem of querying serving instances, it would simply be too slow to go row by row and validate this data. Ultimately, we decided that if we had all of the data in some relational database, we could just join the two tables together, and find any discrepancies in the data that way.
Leveraging StarRocks for Data Validation — Airtable Data Infrastructure Team
For the data validation project, the Data Infrastructure team helped the Storage team in selecting the best tool to complete the validation work efficiently.
Why Use StarRocks to Address the Data Validation Problem?
The core of the data validation problem involves performing a large number of join operations between two massive datasets, each containing nearly a trillion rows of data. The primary challenge was executing these computationally intensive join operations efficiently.
After thorough investigation, StarRocks was chosen due to its exceptional join performance. It can handle these operations with affordable computational costs, whereas other query engines struggle significantly with the same workload.
To address the problem, we decided to load raw Parquet files from S3 into local tables in StarRocks. By leveraging StarRocks’ colocation mechanism, we could efficiently perform the join operations required for data validation.
StarRocks Architecture
Press enter or click to view image in full size
The diagram above illustrates the StarRocks architecture, which can access the following data sources:
- Data Lakes on S3: Includes Hudi Lake, Delta Lake, Iceberg Lake, and Paimon Lake.
- Native Format on S3: StarRocks allows the creation of tables that persist data directly in S3 using its native format.
- Raw Parquet, JSON, or CSV Files on S3: Queries can be executed directly on raw Parquet, JSON, or CSV files stored in S3.
In our specific scenario, we loaded raw Parquet files from S3 storage into StarRocks’ local tables to perform data validation, as highlighted above.
Ingestion Optimization: Enhances data ingestion performance in StarRocks
We had to load nearly 1 trillion rows of data from raw Parquet files into StarRocks local tables. The dataset consisted of hundreds of millions of small Parquet files. Without proper optimization and parallelization, ingesting the entire dataset would have taken several months.
To accelerate the ingestion throughput, we implemented the following optimizations:
- Reduce the Number of Replicas (from 3 to 1):
Since this is a one-time validation task, maintaining high availability for production is unnecessary. Reducing the number of replicas significantly decreases the total data volume to be ingested. - Increase Internal Ingestion Parallelism:
As the validation process involves ingestion first, followed by join-based validation, ingestion performance does not affect serving scenarios. We increased parallelism by tuning the following parameters:
- pipeline_dop
- pipeline_sink_dop - Increase the Number of Buckets per Partition:
Given the large data volume, we ensured that each bucket contained no more than 5GB of data. Increasing the number of buckets per partition significantly improves ingestion throughput. Although this may cause compaction to lag behind, it is not a concern in our specific scenario.
These optimizations collectively help to efficiently handle the massive data ingestion process required for our validation workload.
Ingestion
Export
Once StarRocks was set up, we needed to get all of the data ingested from both our RDS export and the transformed archive which we planned on serving from. We decided that it was not cost and time efficient to store all ~1PB in StarRocks, so we decided to just hash all of the non-key columns in the table. We ingested two tables as a part of this process, but our examples will focus primarily on just one, _actionLog.
Initial solution: Simple table with hashed non-key columns
We started with a table schema for each of our tables which looked like this:
CREATE TABLE `_rdsExportActionLog` (
`id` bigint(20) NOT NULL COMMENT "",
`application` varchar(65533) NOT NULL COMMENT "",
`hash_value` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`, `application`)
DISTRIBUTED BY HASH(`id`, `application`)
ORDER BY(`application`)
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "action_log_group",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "ZSTD"
);And we started to load the data using insert statements like this
INSERT INTO \`exportActionLog\`
WITH LABEL ${label}
(id, application, hash_value)
SELECT id, application, XX_HASH3_64(CONCAT_WS(',',
<columns>)) as hash_value
FROM FILES(
"path" = "s3://${bucket}/${folder}*.parquet",
"format" = "parquet",
);Data distribution and Loading Bottlenecks
However, we found that this loading operation took a really long time, on the order of almost 1 day to load two of our shards. We found that as we added more data and the table got bigger, our ingestion rate slowed further. What was also odd is that if we increased the local parallelization of this ingestion, i.e. ingested multiple shards at once, we didn’t see almost any performance boost, and when we set this value to be more than five, we saw a lot of this:
JobId: 14094
Label: insert_16604b11–7f2d-11ef-888c-46341e0f370e
State: LOADING
Progress: ETL:100%; LOAD:99%
Type: INSERTYou can see here that our load value is 99%, but a number of these large loads would just get stuck at this value and not be able to progress past this state quickly despite getting to 99% quickly. Per the StarRocks documentation, “When all data is loaded into StarRocks, 99% is returned for the LOAD parameter. Then, loaded data starts taking effect in StarRocks. After the data takes effect, 100% is returned for the LOAD parameter.” Evidently we were experiencing some bottlenecks on the data taking effect with our initial solution.
Improvement 1: Increase bucket count
We were distributing our data by id and application, but we had yet to specify the number of buckets to distribute this data into (more info on StarRocks data distribution). Our hypothesis was that as we stored more and more data, these buckets got increasingly larger and more cumbersome which led us to the slow down that we were seeing. We consulted the StarRocks team who suggested that for our data volume, we should look at specifying somewhere on the order of 7200 buckets for the smaller of the two tables, so we changed our schema to look like this:
CREATE TABLE `exportActionLog` (
`id` bigint(20) NOT NULL COMMENT "",
`application` varchar(65533) NOT NULL COMMENT "",
`hash_value` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`, `application`)
DISTRIBUTED BY HASH(`id`, `application`) BUCKETS 7200
ORDER BY (`application`)
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "action_log_group",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "ZSTD"
);However, while we were able to load this data much more quickly than we were previously, we ran into this memory issue:
message: 'primary key memory usage exceeds the limit.
tablet_id: 10367, consumption: 126428346066, limit: 125241246351.
Memory stats of top five tablets: 53331(73M)53763(73M)53715(73M)53667(73M)53619(73M): Improvement 2: Partition the table by shard ID
We realized that it would make sense to just partition the table by the shardId and try to load it that way. This would allow us to specify a number of buckets for each partition, and they would be stored in a more efficient manner. Using some rough math, we found that:
actionLog => 10TB (Hashed) => 10 * 1024 / 148 shards = 69GB per shard =>
34 buckets to host it => add some buffer, 64 buckets per partitionIn total: 64 buckets per partition * 148 shards = 9472 buckets
We figured that using this distribution could also allow us to validate shard by shard which would help to not overwhelm the memory of the cluster. In the end we created this table and adjusted our loading statement to pull the shard ID from the S3 file path.
CREATE TABLE `exportActionLog` (
`id` bigint(20) NOT NULL COMMENT "",
`application` varchar(65533) NOT NULL COMMENT "",
`shard` int(11) NOT NULL COMMENT "",
`hash_value` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`, `application`, `shard`)
PARTITION BY (`shard`)
DISTRIBUTED BY HASH(`id`, `application`) BUCKETS 64
ORDER BY(`application`)
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "action_log_group_partition_by_shard",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "ZSTD"
);Press enter or click to view image in full size
This drastically sped up our ingestion time, LSDA data was successfully loaded into StarRocks from S3 in under 10 hours. The average throughput was approximately 2 billion rows per minute.
Archive
Now that the full RDS export, our source of truth data for validation, had been ingested into StarRocks, we needed to ingest the archive we were going to serve the data from and run our validation process across the two datasets. Unfortunately, the archive data was not stored in the same format as the export data which presented an additional set of challenges. Drawing from our experience with the export ingestion, we decided to create the tables in an identical way, with both the same number of buckets and the same partitioning strategy by shardId. This also allowed us to have the export and the archive in the same colocation group, such that we could use StarRocks’ colocation join.
CREATE TABLE `_rdsArchiveActionLog` (
`autoincr_id` bigint(20) NOT NULL COMMENT "",
`applicationId` varchar(65533) NOT NULL COMMENT "",
`shardId` int(11) NOT NULL COMMENT "",
`hash_value` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`autoincr_id`, `applicationId`, `shardId`)
PARTITION BY (`shardId`)
DISTRIBUTED BY HASH(`autoincr_id`, `applicationId`) BUCKETS 64
ORDER BY(`applicationId`)
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "action_log_group_partition_by_shard",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "ZSTD"
);Discrepancies in directory structure between RDS export and archive
However, a major difference between our export and our archive was the way in which they were stored in S3. For our export, it was stored in large directories, each corresponding to an individual shard and partition in StarRocks. This made our insert queries to StarRocks quite simple; we could just wildcard all of the parquet files across these directories, and from the directory we were able to get the shard ID which we could then insert into the row and use for partitioning.
For the archive, our data was stored by application to make serving from S3 in the application simpler. This meant that we had over 6 million small directories corresponding to each application that did not necessarily correspond to a given shard, some applications can be stored across multiple shards. We also had not stored the source shard information in S3, so unlike with our export, we had no ability to easily get our shardId from our single call to S3. Instead, we had stored yet to be validated file metadata in DynamoDb. Given this information, we created a Global Secondary Index in DynamoDb with a sort key as the shard ID, and then queried all of the files for that shardId to insert those.
Grouping inserts in StarRocks using Union
Additionally, StarRocks only lets you specify a single path for each insert statement, meaning that each file would need its own insert statement. This was in stark contrast to our export which only had about ~160 folders and corresponding insert statements. We now had more than 6 million insert statements for our archive. To solve this problem, we figured we could just try to heavily parallelize the process. We thought that we could just run multiple shards at once and run multiple processes per shard. Note, this is all through Node and TS, so the idea of multithreading for this is not really true, but we could run multiple processes which would run if others were blocked on I/O. We tried to run a single shard with 10 threads, but hit this problem:
message: 'Failed to load data into tablet 14775287,
because of too many versions, current/limit: 1006/1000.
You can reduce the loading job concurrency, or increase loading data batch size.
If you are loading data with Routine Load, you can increase
FE configs routine_load_task_consume_second and max_routine_load_batch_sizeEssentially, because multiple processes were running insertions at the same time, we were creating too many versions of the table for StarRocks to compact based on the compaction frequency. Additionally, you can no longer increase the version limit in StarRocks beyond 1000.
Given this, we tried to pursue a strategy which would let us reduce the number of insert queries to try to hand off more of the work to StarRocks (at this point it was running at low CPU and memory and the process was taking a while/failing). To do this, given that we can only specify a single path per select statement, we just created a for loop to generate an insert statement like this
INSERT INTO <table>
SELECT * FROM FILE_1
UNION ALL
SELECT * FROM FILE_2
……With this strategy, we were actually able to run this for 100 applications at once, speeding this up significantly. This allowed us to load all ~1 trillion rows on the archive side from 6 millions applications in just about 3 days.
Press enter or click to view image in full size
Acknowledgements
Thank you to Daniel Kozlowski, Kun Zhou, Matthew Jin and Xiaobing Xia for all of their contributions on this project.