The key ideas leveraged by BigDataFlow
Bringing database query planner inside Workflow Management Systems
Going back to the goal we introduced in Part 1 for our data pipeline platform: Users should only write the task
How does the project infer statically ?
- the type of inputs and outputs
- the DAG of tasks
The first idea leveraged is: data transformations can be described in terms of SQL without restricting expressiveness.
Most databases offer the possibility to implement in a programming language (often the one used to implement the database engine itself) user defined functions to enrich the default catalog of scalar, aggregation and table functions. This way to enrich SQL expressiveness gained popularity with query engines like Hive or SparkSQL where adding User Defined Functions has almost no impact on the reliability of the system.
F1-Query or Presto/Trino are projects that also pushed SQL expressiveness beyond what a single database can do: Those query processor engines interact with several underlying databases and have support for running User Defined Functions on remote servers.
Following the same trend we included a Foreign Transformation Interface (FTI), allowing the platform to execute data transformation programs written in foreign technologies such as Python or Spark. This generic User Defined Table Function allows to escape from pure SQL which is key to embark Machine Learning workloads.
Once decided that data transformations are defined in terms of SQL, we can leverage its declarative power!
Datasets consume each others, so from the SQL queries we can build a graph of datasets that depend on each others.
In all directed acyclic graphs there are some root nodes that don’t depend on any others.
Those root nodes necessarily have data that comes from somewhere: either they read from external systems like RDBMS or they can be Hive tables made available by other systems. FTIs are used for both. Many times, root datasets in Criteo are Kafka topics dumped as hourly partitioned Hive tables on HDFS. Views towards these external datasets can be declared in BigDataFlow in order to make them available as input for its internal jobs usage. Those views are generated and updated by some automation. They use a FTI named EXTERNAL.
Press enter or click to view image in full size
Because foreign transformations are opaque, notice that when using EXTERNAL (as well as any other FTIs) the output schema must be defined.
Now that initial nodes have schemas, for any other dataset in the graph: its schema can be derived from its SQL query and the schemas of the datasets it consumes. For the products_sales dataset:
Press enter or click to view image in full size
Here we have one of our two objectives: the type of inputs and outputs !
To tackle our second objective (DAG of tasks) let’s take a closer look at the addition made to SQL for scheduling:
Press enter or click to view image in full size
The for-comprehension defines a list of scopes.
The 31–03–2022 the TIMESERIES scalar function produces an array of two timestamps, and the for-comprehension produces the 4 following scopes:
t = 31-03-2022 00:00:00, day=2022-03-31, store=Detroit
t = 31-03-2022 00:00:00, day=2022-03-31, store=Paris
t = 30-03-2022 00:00:00, day=2022-03-30, store=Detroit
t = 30-03-2022 00:00:00, day=2022-03-30, store=ParisThe 4 scopes lead to 4 insert partitions queries (aka Tasks):
INSERT "31-03-2022" as day, "Detroit" as store SELECT ...
INSERT "31-03-2022" as day, "Paris" as store SELECT ...
INSERT "30-03-2022" as day, "Detroit" as store SELECT ...
INSERT "30-03-2022" as day, "Paris" as store SELECT ...BigDataFlow has a command line tool that prints those information, the 31–03–2022 we get the following outputs:
Press enter or click to view image in full size
Now for the same scheduling day 31–03–2022, lets look at the data transformation of the products_sales day=2022–03–31 partition.
Press enter or click to view image in full size
-- For scope:
-- t = 31-03-2022 00:00:00,
-- partition_day = 2022-03-31INSERT "2022-03-31" as day
SELECT product_id,
SUM(nb_sales) AS nb_sales,
SUM(revenue) AS revenue
FROM bigdataflow.store_sales
WHERE stores_sales.day = "2022-03-31"
GROUP BY product_idUsing the FROM + WHERE construct of the query:
FROM bigdataflow.store_sales
WHERE stores_sales.day = "2022-03-31"We know that this task depends on those of store_sales that matches this WHERE filter. We just identified the 4 partitions that are supposed to exist in stores_sales thus by applying the day = “2022–03–31” filter to them we derive that bigdataflow.products_sales(day=2022–03–31) depends on: bigdataflow.stores_sales(day=2022–03–31, store=”Detroit”) and bigdataflow.stores_sales(day=2022–03–31, store=”Paris”)
BigDataFlow command line tool display this information:
➜ bdf show bigdataflow.products_sales day=2022-03-31Evaluation Scope
================Name | Value | Type
--------------+----------------------+----------
partition_day | 2022-03-31 | STRING
t | 2022-03-31T00:00:00Z | TIMESTAMPPartition columns
=================Column | Value | Type
-------+------------+-------
day | 2022-03-31 | STRINGDependencies
============- bigdataflow.stores_sales/day=2022-03-31/store=Paris
- bigdataflow.stores_sales/day=2022-03-31/store=DetroitWHERE filters can be more complex than this, a frequent pattern we have look like:
WHERE stores_sales.day BETWEEN DATE_SUB(partition_day, 3) AND partition_dayIdentifying the data read by a query is exactly what database query planners do when building the logical plan. Query planners then optimize that logical plan and translate it into a physical execution plan used to compute the result of an SQL query. The difference with an actual SQL engine is we only do the first part of the query planning work and instead of planning against the actual state of data we plan against the theoretical partitions listing regardless of whether they are available or not.
By identifying the dependencies of all partitions to compute (aka: tasks or jobs) we get our DAG of tasks!
In short, we brought database query planning to workflow management system.
This allows us to only ask users to write the tasks in the format of SQL queries.
The BigDataFlow scheduler doesn’t have a state of its own distinct from the underlying data state.
Existence or absence of a partition of data in the underlying system (= Hive Metastore) determines if it has to be computed or if it’s already done. The BigDataFlow scheduling algorithm compares the list of tasks found in its DAG and the partitions of data that actually exist in the underlying systems. For each task where the output is missing and dependencies are available, it launches the task execution. A symmetric mechanism is at play for data retention: as time passes old data partitions that don’t have a corresponding task in the DAG anymore are dropped.
By inferring the DAG of tasks statically, we saw that the platform can then take care of all the things workflow management systems do:
- schedule tasks executions
- drop data out of retention
- backfill data = when a user signal that a range of partitions of a dataset had invalid data: recompute those all well as all downstream data that got derived from them
And, by also inferring tasks input and output types, it provides support for managing breaking changes!
When you drop or rename a column in a dataset, errors will be identified statically at compile time if it breaks downstream datasets that rely on it. To merge such change the impacted downstream datasets must be adapted as part of the change or prior to it which sometimes need multiple teams collaboration on the code review. Identification of impacted users and orchestration of the release are now a no-brainer. The flow is always valid and continuously deployed. Users are only concerned with the decision making of how to adapt the pipeline to a given change.
In the last post we will share our experience running this platform in production at Criteo.
Let me side track a little from the main story
There are many similarities between what we do and what build systems do as well as with continuous deployment of dependent services.
On the build system perspective, a insightful read is “Build Systems à la Carte”. The following definition of a build system given in that paper apply to BigDataFlow!
The goal of any build system is to bring up to date a store that implements a mapping from keys to values.
In BigDataFlow the passage of time alone causes the set of tasks to change, which is unusual in build systems but doesn’t fundamentally change the problem at hand.
BigDataFlow can be described using the semantic framework for comparing build systems developed in that paper: It has a restarting scheduler like Excel and Bazel (We can’t use a topological scheduler because we support some forms of dynamic dependencies not detailed in this post) and a dirty bit rebuilder (Data partitions to backfill are flagged in a persistent store).
Unlike most build systems rebuilding downstream tasks is not systematically desirable for data pipelines: the cost can be prohibitive. Using a dirty bit rebuilder allows, when creating a backfill, to select what part of the downstream graph should be backfilled.
Another differences with the build systems studied in that paper is that the identification of task dependencies (presented in this post) is expensive, especially with complex WHERE predicates. To avoid recomputing those over and over with our restarting scheduler we cache computed tasks dependencies and use a merkle hash of depended on datasets to validate/invalidate them.
Raph (@heapoverflow ) on behalf of the Data Processing team