Press enter or click to view image in full size
At Cloudside, one of the most recurring workloads that we implement for our clients is to build a data warehouse on the Google Cloud Platform using BigQuery. As a part of this, we work with multiple relational database sources like SQL Server, Oracle, MySQL, and PostgreSQL. Most of our clients would like to keep the data warehouse with the source database with minimal possible delay, so tapping into the native CDC feature would make sense.
While GCP has multiple tools and approaches that can do CDC like Datastream (currently Oracle and MySQL are supported, I hope PostgreSQL is in the works), Data Fusion replication (currently Oracle, MySQL, and SQL Server are supported) and Dataflow, we recently tested and implemented another opensource ETL tool, AirByte. We like it for its simplicity, and ease of use. In a series of posts, we will look at the syncing PostgreSQL, SQL Server, and MySQL to BigQuery, starting with PostgreSQL in this post. Let’s get started!
Deploy AirByte
We will not go into the details of creating a VM and deploying AirByte in this post. It’s pretty straightforward, and you can take a look at the step-by-step here to spin up a GCE instance and install AirByte on it. Since AirByte doesn’t come with any authentication, I recommend deploying AirByte in an unmanaged instance group, behind the HTTP(s) Loadbalancer and enable the Identity Aware Proxy(IaP).
Press enter or click to view image in full size
IaP
Press enter or click to view image in full size
Prepare RDS PostgreSQL
We assume that AWS and GCP VPCs have been already connected with a site-to-site VPN. Let’s create a dedicated user for Airbyte replication with minimum necessary privileges (readonly).
Publication, Replication Slot and AirByte Connections
AirByte’s PostgreSQL CDC taps into native pglogical replication in PostgreSQL. At a high level, we create a publication for select tables, create a replication slot and configure a sync. Below is a representation of a sample AirByte Connection. It has a source with publication and replication slot specified, a destination, and a Connection that defines the tables that will be replicated, the frequency at which the sync runs.
Press enter or click to view image in full size
In a production environment, you are likely to sync more than a handful of tables and some of these tables might benefit from grouping — for example, sync all inventory-related tables every 30 min and sync transactions table every 5 min. I recommend splitting your source tables into a handful of categories like this so that you can have respective publications created in PostgreSQL. This helped us handle the connections better and also debug the respective pipeline instead of all tables in a single connection. Here is a representation of what it looks like.
Press enter or click to view image in full size
So to summarize — we will create a separate publication in PostgreSQL for each group of tables, one BigQuery destination, and one AirByte connection each for a publication.
Create Publication(s)
Note: The AirByte documentation instructs you to create a replication slot first before creating publications. However, I ran into a debezium error which said “publication does not exist”. Apparently the order matters, so create publication(s) first and then create replication slot
In the source RDS instance, set the replica identity to default. Repeat this for all the tables you intend to sync
ALTER TABLE tbl1 REPLICA IDENTITY DEFAULT;
.
.
ALTER TABLE tbl-n REPLICA IDENTITY DEFAULT; Now, create publication(s)
Create a logical replication slot
SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');Press enter or click to view image in full size
We can now go ahead and create AirByte sources, destinations, and connections to start the sync.
AirByte Connections
Log on to the AirByte web UI.
Press enter or click to view image in full size
Configure a destination first. We don’t need multiple destinations, because that will be redundant and we will anyway be writing all results to BigQuery datasets. Before we proceed further, you need to have the following
- A service account that has enough access to write to BigQuery and its credentials (we will use this to authenticate with BQ)
- A Google Cloud Storage bucket. This will be used for staging data from the source before loading to BigQuery. The other option os writing directly to BigQuery without storing is not efficient.
- HMAC credentials for the service account. Follow the instructions here to create.
Create a Destination
Go to “Destinations” in AirByte and configure one. It’s UI and things are self-explanatory. here is my configuration.
Press enter or click to view image in full size
Press enter or click to view image in full size
Save and close. Now, create a source for each of your publications.
Create Data Sources
In AirByte UI, go to sources and create. Again, the configuration is self-explanatory. Just note that the name of the publication will change for each data source you create (if you are using multiple publications). All other details remain the same. Here is a sample configuration of one of my data sources for inventory tables
Press enter or click to view image in full size
Under “Replication Method” — choose Logical Replication (CDC), and fill in the publication name, and replication slot name. Since this data source is for inventory tables, I am going to use the respective publication name. You can leave the initial waiting time in seconds to the default value.
Press enter or click to view image in full size
Setup a Connection
Finally, it’s time to connect the source and the destinations we created above using a connection. In AirByte UI, go to “Connections” and configure.
Get RK Kuppala’s stories in your inbox
Join Medium for free to get updates from this writer.
In the first screen, use the existing source name that we created above
Press enter or click to view image in full size
Similarly, use the existing destination
Press enter or click to view image in full size
Give the connection an appropriate name, and choose the frequency at which you would like to sync. Under streams, I prefer to “Mirror source structure” because it will essentially emulate schema.table to dataset.table in BigQuery.
Press enter or click to view image in full size
You will also see a bunch of tables (the ones you added to this publication in one of the previous steps).
Press enter or click to view image in full size
PostgreSQL connector in AirByte supports multiple sync modes such as Full Refresh — Overwrite, Full Refresh — Append, Incremental — Append and incremental — Depuded History. In our case, we will choose Incremental + Deduped history because it will create two set of tables apart from the temporary tables. These two tables are Table_scd (a history table with all changes, its SCD type 1) and the target deduped table (similar to the source table).
That’s it. You can now repeat this process for all other publications you have created and choose the existing target BQ destination.
In my case, I ended up with 5 different sources (1 source for each publication)
Press enter or click to view image in full size
And 1 destination, BigQuery target used by all connections
Press enter or click to view image in full size
And 5 connections
Press enter or click to view image in full size
One connection for each publication. I was able to successfully run the sync to BigQuery
That’s about it. I hope you found this useful. Happy replication! :)