Golang — CDC using AWS lambda and Aurora

4 min read Original article ↗

brian david

Today we are going to discuss a simple architecture for capturing data change in a database, typically referred to as CDC. For this tutorial, we will be implementing the lambda in Golang, any language supported by AWS lambda can be used.

We will be using MySQL as the database and because Amazon RDS does not currently support calling the stored procedure: mysql.lambda_async, we will be using Amazon’s Aurora MySQL database.

CDC diagram

First lets create a simple table in our brand new aurora database:

CREATE TABLE `users` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;

Next, lets add a trigger to the new table that will call a built in AWS stored procedure

DELIMITER $$
CREATE TRIGGER Insert_Users_Trigger
AFTER INSERT
ON users FOR EACH ROW
BEGIN
CALL mysql.lambda_async('<lambda-arn>',
CONCAT('{ "userId" : "', New.id,
'", "eventType" : "', "InsertUserEvent",
'", "createdAt" : "', New.created_at,'",
'", "streamName" : "<kinesis-stream-name>" }')
);
END$$
DELIMITER ;

Here we must replace <lambda-arn> with the function arn of our lambda. and replace the <kinesis-stream-name> with the stream name created in the AWS console. IAM policies, roles, and permissions must be granted for this to work. This is left up to the user, but can be configured in the AWS console.

This is a very simple event and can be expressed in Golang as a struct like:

type InsertUserEvent struct {
UserId int64 `json:"userId"`
EventType string `json:"eventType"`
CreatedAt string `json:"createdAt"`
StreamName string `json:"streamName"`
}

Now lets create the lambda function that will be receiving the above event

package mainimport (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
)
var (
// PathSeparator is the character used to separate the elements
// of the paritionKey.
//
// For example, `userId:createdAt`
PathSeparator string = ":"
region string = "us-west-2"
)
type InsertUserEvent struct {
UserId int64 `json:"userId"`
EventType string `json:"eventType"`
CreatedAt string `json:"createdAt"`
StreamName string `json:"streamName"`
}
type Producer struct {
Client *kinesis.Kinesis
}
func GetKinesisClient(region string) *kinesis.Kinesis {
s := session.New(&aws.Config{Region: aws.String(region)})
kc := kinesis.New(s)
return kc
}
// Put - put a single record onto a kinesis stream
func (producer *Producer) Put(ctx context.Context, streamName string, data []byte, partitionKey string) (interface{}, error) {
// put record on kinesis
putOutput, err := producer.Client.PutRecordWithContext(
ctx,
&kinesis.PutRecordInput{
Data: data,
StreamName: aws.String(streamName),
PartitionKey: aws.String(partitionKey),
})
if nil != err {
return nil, err
}
return putOutput, nil
}
// GetPartitionKey - returns a string partitionKey
func GetPartitionKey(stringArray []string) string {
return strings.Join(stringArray, PathSeparator)
}
func HandleRequest(ctx context.Context, evt InsertUserEvent) error {
kc := GetKinesisClient(region)
producer := &Producer{Client: kc}

// convert struct to json payload
payload, err := json.Marshal(evt)
if nil != err {
return err
}

// puts json payload onto kinesis
_, err = producer.Put(
ctx,
evt.StreamName,
payload,
GetPartitionKey([]string{fmt.Sprintln(evt.UserId), evt.CreatedAt}),
)
if nil != err {
return err
}
return nil
}
func main() {
// change the flags on the default logger
log.SetFlags(log.LstdFlags | log.Lshortfile)
lambda.Start(HandleRequest)
}

The code above assumes that the stream being passed in the event exists. This is a simple task to add a call to DescribeStream.

And thats it for Part 1. We have created a very simple CDC for grabbing insert events out of db and placed on a streaming service, AWS Kinesis. There are many benefits to this approach:

  1. This lambda can be used as a router because the eventType and StreamName are passed by the MySQL trigger. The Kinesis stream can grown by adding additional shards.
    * Things to think about: adding additional fields to the trigger payload, maybe tableName and databaseName?
  2. For an Insert Trigger including the pk is beneficial because we can add additional logic in the lambda function to query the db by primary key and grab additional columns. It is also possible to add additional fields/columns to our json payload in the Trigger.
    * Updates are more difficult because if we want a client with additional logic we need to not only include all the new values, but the old values. This can cause the payload to grow in size and i currently do not know of an easy way to serialize the New/Old values easily without individually calling each column, for instance: New.id, Old.id
  3. Because AWS Kinesis is a streaming service it is possible to have multiple clients listening to the same message. For instance: we could have a redis client that takes this event and populates a cache, we could have an ElasticSearch client that inserts a new document, we could have a client that publishes messages to third-party clients

Hope this was helpful

What was not covered here is creating and deploying all this. If interested, i can do a Part 2 and include the nessecary Terraform.
What i would cover in a Part 2

  1. IAM roles and policies using Terraform
  2. Creation of aurora, kinesis, lambda using Terraform
  3. Building and zipping golang code and uploading to S3 using Makefile and Terraform