Replicate RDS PostgreSQL to BigQuery using AirByte CDC

6 min read Original article ↗

RK Kuppala

Press enter or click to view image in full size

At Cloudside, one of the most recurring workloads that we implement for our clients is to build a data warehouse on the Google Cloud Platform using BigQuery. As a part of this, we work with multiple relational database sources like SQL Server, Oracle, MySQL, and PostgreSQL. Most of our clients would like to keep the data warehouse with the source database with minimal possible delay, so tapping into the native CDC feature would make sense.

While GCP has multiple tools and approaches that can do CDC like Datastream (currently Oracle and MySQL are supported, I hope PostgreSQL is in the works), Data Fusion replication (currently Oracle, MySQL, and SQL Server are supported) and Dataflow, we recently tested and implemented another opensource ETL tool, AirByte. We like it for its simplicity, and ease of use. In a series of posts, we will look at the syncing PostgreSQL, SQL Server, and MySQL to BigQuery, starting with PostgreSQL in this post. Let’s get started!

Deploy AirByte

We will not go into the details of creating a VM and deploying AirByte in this post. It’s pretty straightforward, and you can take a look at the step-by-step here to spin up a GCE instance and install AirByte on it. Since AirByte doesn’t come with any authentication, I recommend deploying AirByte in an unmanaged instance group, behind the HTTP(s) Loadbalancer and enable the Identity Aware Proxy(IaP).

Press enter or click to view image in full size

IaP

Press enter or click to view image in full size

Prepare RDS PostgreSQL

We assume that AWS and GCP VPCs have been already connected with a site-to-site VPN. Let’s create a dedicated user for Airbyte replication with minimum necessary privileges (readonly).

Publication, Replication Slot and AirByte Connections

AirByte’s PostgreSQL CDC taps into native pglogical replication in PostgreSQL. At a high level, we create a publication for select tables, create a replication slot and configure a sync. Below is a representation of a sample AirByte Connection. It has a source with publication and replication slot specified, a destination, and a Connection that defines the tables that will be replicated, the frequency at which the sync runs.

Press enter or click to view image in full size

In a production environment, you are likely to sync more than a handful of tables and some of these tables might benefit from grouping — for example, sync all inventory-related tables every 30 min and sync transactions table every 5 min. I recommend splitting your source tables into a handful of categories like this so that you can have respective publications created in PostgreSQL. This helped us handle the connections better and also debug the respective pipeline instead of all tables in a single connection. Here is a representation of what it looks like.

Press enter or click to view image in full size

So to summarize — we will create a separate publication in PostgreSQL for each group of tables, one BigQuery destination, and one AirByte connection each for a publication.

Create Publication(s)

Note: The AirByte documentation instructs you to create a replication slot first before creating publications. However, I ran into a debezium error which said “publication does not exist”. Apparently the order matters, so create publication(s) first and then create replication slot

In the source RDS instance, set the replica identity to default. Repeat this for all the tables you intend to sync

ALTER TABLE tbl1 REPLICA IDENTITY DEFAULT;
.
.
ALTER TABLE tbl-n REPLICA IDENTITY DEFAULT;

Now, create publication(s)

Create a logical replication slot

SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');

Press enter or click to view image in full size

We can now go ahead and create AirByte sources, destinations, and connections to start the sync.

AirByte Connections

Log on to the AirByte web UI.

Press enter or click to view image in full size

Configure a destination first. We don’t need multiple destinations, because that will be redundant and we will anyway be writing all results to BigQuery datasets. Before we proceed further, you need to have the following

  • A service account that has enough access to write to BigQuery and its credentials (we will use this to authenticate with BQ)
  • A Google Cloud Storage bucket. This will be used for staging data from the source before loading to BigQuery. The other option os writing directly to BigQuery without storing is not efficient.
  • HMAC credentials for the service account. Follow the instructions here to create.

Create a Destination

Go to “Destinations” in AirByte and configure one. It’s UI and things are self-explanatory. here is my configuration.

Press enter or click to view image in full size

Press enter or click to view image in full size

Save and close. Now, create a source for each of your publications.

Create Data Sources

In AirByte UI, go to sources and create. Again, the configuration is self-explanatory. Just note that the name of the publication will change for each data source you create (if you are using multiple publications). All other details remain the same. Here is a sample configuration of one of my data sources for inventory tables

Press enter or click to view image in full size

Under “Replication Method” — choose Logical Replication (CDC), and fill in the publication name, and replication slot name. Since this data source is for inventory tables, I am going to use the respective publication name. You can leave the initial waiting time in seconds to the default value.

Press enter or click to view image in full size

Setup a Connection

Finally, it’s time to connect the source and the destinations we created above using a connection. In AirByte UI, go to “Connections” and configure.

Get RK Kuppala’s stories in your inbox

Join Medium for free to get updates from this writer.

In the first screen, use the existing source name that we created above

Press enter or click to view image in full size

Similarly, use the existing destination

Press enter or click to view image in full size

Give the connection an appropriate name, and choose the frequency at which you would like to sync. Under streams, I prefer to “Mirror source structure” because it will essentially emulate schema.table to dataset.table in BigQuery.

Press enter or click to view image in full size

You will also see a bunch of tables (the ones you added to this publication in one of the previous steps).

Press enter or click to view image in full size

PostgreSQL connector in AirByte supports multiple sync modes such as Full Refresh — Overwrite, Full Refresh — Append, Incremental — Append and incremental — Depuded History. In our case, we will choose Incremental + Deduped history because it will create two set of tables apart from the temporary tables. These two tables are Table_scd (a history table with all changes, its SCD type 1) and the target deduped table (similar to the source table).

That’s it. You can now repeat this process for all other publications you have created and choose the existing target BQ destination.

In my case, I ended up with 5 different sources (1 source for each publication)

Press enter or click to view image in full size

And 1 destination, BigQuery target used by all connections

Press enter or click to view image in full size

And 5 connections

Press enter or click to view image in full size

One connection for each publication. I was able to successfully run the sync to BigQuery

That’s about it. I hope you found this useful. Happy replication! :)