Data Contracts in Action: Testing
There has been a lot of talk about data contracts, but not much action. Let us see them in action when applied to testing data pipelines.
Background
Maybe you have or haven’t heard about data contracts, so to give you a brief introduction, a data contract is an agreement between data producers and consumers. It contains information about the data source, schema, ownership, data quality and more, to help both parties understand what they can expect from the data. Much like the OpenAPI Spec which has become the industry standard for REST APIs but for all data.
The Open Data Contract Standard is the leading open-source project that aims to standardise the format of data contracts so that:
- Tools can be built based on a common standard
- Producers and consumers of data have a common understanding of what the data represents
- Simplify and organise the complex nature of data flows
Press enter or click to view image in full size
Data Pipelines
One of the main use cases where data contracts are applied is in data pipelines. In many organisations, data pipelines move data from one data source to another, usually with some transformations applied. Data contracts supply the glue needed for data pipelines to:
- Ingest the data correctly from upstream data sources
- Allow downstream consumers to understand what data is available and how to consume it
The schema definition is the main part of the contract that the data producer and consumer care about.
Example Scenario
Using an example, we can illustrate the role of data contracts in the data flow. The scenario is we have a Postgres database with a table called accounts. A job (data consumer) then reads this table and pushes the data into a parquet file. The schema of the Postgres database, as defined in the Open Data Contract Standard, looks like the below:
dataset:
- table: accounts
columns:
- column: account_number
logicalType: integer
physicalType: integer
- column: name
logicalType: string
physicalType: varchar(20)Let’s say that there is a scenario where the account_number is starting to reach the upper limits of the maximum integer that can be stored in the Postgres database. So the team decides to migrate to another table where the account_number changes to use data type bigint.
dataset:
- table: accounts_long
columns:
- column: account_number
logicalType: number
physicalType: bigint
- column: name
logicalType: string
physicalType: varchar(20)In theory, if our data consumers are utilising the physicalType or logicalType to read the dataset, migrating to this new table should be seamless and work straight away without additional effort. But how do we know for sure? Given we are professional data engineers, we know we should test this data flow.
Testing
To truly test the data flow, we should use integration tests (as I explored in this article). This provides a replica setup of how it would work in production as we connect directly to the data sources (with the same versions), consume data, process it and push it to downstream sources. Data contracts provide us with the metadata required to help generate production-like data.
This is where we can leverage the power of Data Caterer. Data is generated into the upstream data sources according to the metadata in the data contract (you can also define additional metadata). Once the application or job has consumed the data, we can run data validations on the produced data set to check whether the data flowed as expected.
Press enter or click to view image in full size
Depending on your favourite interface, your data generation in Data Caterer will look like the below (full guide can be found here):
Java
var accountTask = postgres("my_postgres", "jdbc:postgresql://host.docker.internal:5432/customer")
.schema(metadataSource().openDataContractStandard("/opt/app/mount/odcs/my-data-contract.odcs.yaml"))
.count(count().records(100));Scala
val accountTask = postgres("my_postgres", "jdbc:postgresql://host.docker.internal:5432/customer")
.schema(metadataSource.openDataContractStandard("/opt/app/mount/odcs/my-data-contract.odcs.yaml"))
.count(count.records(100))YAML
name: "generate_accounts"
steps:
- name: "accounts"
type: "postgres"
options:
metadataSourceType: "open_data_contract_standard"
dataContractFile: "/opt/app/mount/odcs/my-data-contract.odcs.yaml"
count:
records: 100UI
An example of how to create a new metadata connection to an Open Data Contract Standard file can be found here.
Validation
Once we have generated data, we want to validate that the data is consumed correctly. A variety of validations are available to use as seen in the example below (check here for all possible types of validation):
---
name: "account_checks"
description: "Check account related fields have gone through system correctly"
dataSources:
parquet:
- options:
path: "my/big-data/parquet/accounts"
validations:
- expr: "account_number > 0"
- expr: "ISNOTNULL(name)"
errorThreshold: 0.1 #allow for some null values
description: "It's okay if less than 10% of names are missing"
- preFilterExpr: "name == 'peter'" #before running validation, filter the data
expr: "account_number > 50"
description: "We only allowed peter to get an account after 50 accounts"
- groupByCols: ["account_number"]
aggType: "count"
aggExpr: "count == 1" #check that for each account_number, there is only one record (i.e. it is unique)
- columnNameType: "column_name_match_order"
names: ["account_number", "name"]
- upstreamDataSource: "my_postgres"
joinColumns: ["account_number"]
joinType: "anti"
validation:
aggType: "count"
aggExpr: "count == 0" #check no records are missing between Postgres and the parquet file
- upstreamDataSource: "my_postgres"
joinColumns: ["account_number"]
validation:
expr: "my_postgres_name == name" #check the names matchBenefits
Your integration tests now:
- Dynamically generate data based on the data contract
- Technology agnostic
- Customisable
- Replicate production
This provides you and your team with other benefits such as:
- A common understanding amongst team members of the data pipeline expectations (useful for new members or those who haven’t worked on that particular pipeline)
- Fast feedback loop on testing end-to-end
- Confidence that your job will run successfully in production
- Ability to easily replicate production bugs for debugging issues
Future Improvements
As part of v3 of the Open Data Contract Standard, there will be improvements made to the data quality section. This will enable data validations to automatically be picked up from the data contract to ensure the data produced adheres to the defined data quality rules.
Another exciting improvement you can try out (now in its beta testing) is insta-integration. This project aims to make integration tests even simpler by also starting up the external services (i.e. databases, messaging systems, data catalogs, job orchestrators, query engines) that your application or job uses, along with any startup data, then running your application/job with the data generation and validation of Data Caterer. This allows you to run the whole infrastructure required in your local laptop for fast feedback loops. Additionally, you can embed this as part of your CI/CD (such as GitHub Actions) to further ensure the stability of your data pipelines.
Other data contract articles I’ve written can be found here.
Thanks for reading!