Press enter or click to view image in full size
Today we will be discussing a simple architecture for capturing event data via an API and converting it to the parquet format for long term storage and analytic querying.
Why: Your company wants to create their first “Data Lake”, this has become a very catchy term and everyone is looking for big data 2.0. Your company wants to build something in house to avoid vendor lock in and big bills normally associated with large vendors. Plus your part of the company so who knows your data needs better than you, Right?
We need something fast, that scales horizontally and preferably does not cost an arm and a leg. This brings us to the above architecture. Simple Golang API using the Chi router, Kinesis firehose for capturing events, Amazon glue for converting batch JSON events into the parquet format, and finally S3 for long term storage with the added benefit of querying the data using Amazon’s Athena.
Why Parquet?
We will be running analytical queries and a column based format makes sense. Parquet format enables huge savings in storage costs due to its compact file format. See below
Press enter or click to view image in full size
Lets start by writing a simple Golang REST api using the chi router. See below for packages used:
- Simple Retryer: github.com/avast/retry-go
- Aws Golang SDK v2: github.com/aws/aws-sdk-go-v2
- Firehose API: github.com/aws/aws-sdk-go-v2/service/firehose
- Aws Config: github.com/aws/aws-sdk-go-v2/config
- Chi: github.com/go-chi/chi/v5
- Validator: github.com/go-playground/validator/v10
First lets define a simple config file, I prefer JSON. Here we are defining a basic auth user, where the key is the username and the value is the password. This will be used later in our chi middleware. Consumers, batchlimit, and purgeTimeinMilliseconds will be used for our async PUT and consumers
config.json
{
"basic_auth": {
"user": "password"
},
"port": 9001,
"env": "prd",
"region": "us-west-2",
"streamName": "<kineis-firehose-stream-name>",
"consumers": 10,
"batchLimit": 50,
"purgeTimeInMillisecond": 30000,
"retry": {
"attempts": 3,
"delay_in_seconds": 1
}
}Now lets get to coding. our simple main.go
package mainimport (
"json-to-parquet/internal/app"
"log"
)func main() {
// to change the flags on the default logger
log.SetFlags(log.LstdFlags | log.Lshortfile) app.Start()
}
Lets define our request model and a simple validator
package postimport (
"sync""github.com/go-playground/validator/v10"
)// Interface - post model interface
type Interface interface {
Validate() (bool, []error)
}// Request model
type Request struct {
EventTypeId int `json:"event_type_id" validate:"required,numeric,oneof=0 1 2"`
UserId int64 `json:"user_id" validate:"numeric"`
Timestamp string `json:"timestamp"`
}var validate *validator.Validate
var once sync.Once// Validate - validate given struct
func (r Request) Validate() (bool, []error) {
once.Do(func() {
validate = validator.New()
})errs := make([]error, 0)
if err := validate.Struct(r); nil != err {
for _, err := range err.(validator.ValidationErrors) {
errs = append(errs, err)
} return false, errs
} return true, errs
}
Next we are going to define some helpers for things like decoding the POST body and reading in our JSON config file.
config.go
package configimport (
"encoding/json"
"io/ioutil"
)// Config
type Config struct {
BasicAuth map[string]string `json:"basic_auth"`
Env string `json:"env"`
Region string `json:"region"`
StreamName string `json:"streamName"`
Consumers int `json:"consumers"`
BatchLimit int `json:"batchLimit"`
PurgeTimeInMillisecond int `json:"purgeTimeInMillisecond"`
Port int `json:"port"`
Retry Retryer `json:"retry"`
}// Retryer
type Retryer struct {
Attempts int `json:"attempts"`
DelayInSeconds int `json:"delay_in_seconds"`
}// GetConfig get the config given a path
func GetConfig(configPath string) (Config, error) {
// attempt to load config from config file path
content, err := ioutil.ReadFile(configPath)
if err != nil {
return Config{}, err
}var cfg Config
err = json.Unmarshal(content, &cfg)
if err != nil {
return Config{}, err
}
return cfg, nil
}
requests.go
package requestsimport (
"encoding/json"
"io/ioutil"
"net/http"
)// MalformedRequest - bad request
type MalformedRequest struct {
status int
msg string
}// Error
func (mr *MalformedRequest) Error() string {
return mr.msg
}// Status
func (mr *MalformedRequest) Status() int {
return mr.status
}// DecodeJSONBody attempts to decode a json body payload, returns an error if it fails
func DecodeJSONBody(w http.ResponseWriter, r *http.Request, dst interface{}, maxBytes int64) error {
if r.Body == nil {
return &MalformedRequest{status: http.StatusBadRequest, msg: "empty body"}
}r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
defer r.Body.Close()body, err := ioutil.ReadAll(r.Body)
if nil != err {
return &MalformedRequest{status: http.StatusBadRequest, msg: err.Error()}
}err = json.Unmarshal(body, &dst)
if err != nil {
return &MalformedRequest{status: http.StatusBadRequest, msg: err.Error()}
}return nil
}
Next we need a way to connect to AWS Kinesis firehose. We are using the sdk-v2 here, which is a big update from there original sdk. First we create a simple config loader. This can be used for all AWS clients in the V2 sdk
connections.go
package connectionsimport (
"context""github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
)type AwsConfigLoaderInterface interface {
Load(ctx context.Context, optFns ...func(*config.LoadOptions) error) (aws.Config, error)
}type AwsConfigLoader struct{}// Load - loads aws config
func (a *AwsConfigLoader) Load(ctx context.Context, optFns ...func(*config.LoadOptions) error) (aws.Config, error) {
return config.LoadDefaultConfig(ctx, optFns...)
}
Now lets create our kinesis firehose connection:
package firehoseimport (
"context""json-to-parquet/internal/connections""github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/firehose"
)// New - new connection to kinesis firehose
func New(ctx context.Context, loader connections.AwsConfigLoaderInterface, optFns ...func(*config.LoadOptions) error) (*firehose.Client, error) {
cfg, err := loader.Load(ctx, optFns...)
if err != nil {
return nil, err
}client := firehose.NewFromConfig(cfg)return client, nil
}
Finally we will create a producer to be used in our handler
producer.go
package producerimport "context"// iProducer - producer interface
type Interface interface {
Put(ctx context.Context, data [][]byte, name string) (Response, error)
}// Response - response from put
type Response interface{}
firehose.go
package firehoseimport (
"context"
"json-to-parquet/internal/producer" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/firehose"
"github.com/aws/aws-sdk-go-v2/service/firehose/types"
)type KinesisClient interface {
PutRecordBatch(ctx context.Context, params *firehose.PutRecordBatchInput, optFns ...func(*firehose.Options)) (*firehose.PutRecordBatchOutput, error)
}// Producer - kinesis firehose client
type Producer struct{ KinesisClient }// Put - satisfies Producer interface
func (p *Producer) Put(ctx context.Context, data [][]byte, name string) (producer.Response, error) {
out, err := p.PutRecordBatch(
ctx,
&firehose.PutRecordBatchInput{
DeliveryStreamName: aws.String(name),
Records: Records(data),
},
)return out, err
}// Records - convert [][]byte to kinesis types.Record
func Records(data [][]byte) []types.Record {
ret := make([]types.Record, 0, len(data))for _, record := range data {
ret = append(ret, types.Record{Data: record})
}return ret
}
Great now we have all of our helpers, interfaces, producer, and configs lets build out our Chi router, and put everything together.
We have seperated the core logic into two file: app.go and start.go.
start.go builds are dependencies, starts consumers, and finally launches an http server.
app.go is where our routing logic and handler code will go. Here we are adding some basic middleware, decoding the POST request into our defined struct. We will then do some basic validation on the struct, and finally put the JSON/byte payload onto a channel. We have defined a batch limit and number of background consumers whos job is to place the batches onto kinesis firehose, where we will use the Amazon Glue service to transform our JSON to the parquet format before storing the file in S3.
app.go
package appimport (
"context"
"json-to-parquet/internal/config"
"json-to-parquet/internal/models/post"
"json-to-parquet/internal/requests"
"encoding/json"
"errors"
"log"
"net/http"
"sync"
"time""json-to-parquet/internal/producer""github.com/avast/retry-go"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)// App
type App struct {
Config *config.Config
Producer producer.Interface
workChan chan [][]byte // buffered channel
wg sync.WaitGroup
}type RequestError struct {
ErrorCodes []string `json:"errorCodes"`
}type ErrorCode intconst (
PutFailed ErrorCode = iota
)// Error
func (er ErrorCode) Error() (s string) {
s = "unknown error"
switch er {
case PutFailed:
return "failed to save record"
}
return
}// New creates a new App
func New(cfg config.Config, producer producer.Interface) *App {
return &App{
Config: &cfg,
Producer: producer,
workChan: make(chan [][]byte, 5000),
}
}// router function handles assignment of routes to handlers
// define your paths and middleware here
func (a *App) router() http.Handler {
r := chi.NewRouter()// A good base middleware stack
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)// Set a timeout value on the request context (ctx), that will signal
// through ctx.Done() that the request has timed out and further
// processing should be stopped.
r.Use(middleware.Timeout(10 * time.Second))r.Use(middleware.AllowContentType("application/json"))
r.Use(middleware.BasicAuth("WWW-Authenticate", a.Config.BasicAuth))// handler for POST event endpt
r.Post("/v1/event", a.postRequestHandler)return r
}// CloseChannel
func (a *App) CloseChannel() {
// close the worker channel
close(a.workChan)
// wait for all work to complete
a.wg.Wait()
}// StartConsumers if async start consumers
func (a *App) StartConsumers() {
// launch firehose consumers in goroutines
for i := 0; i < a.Config.Consumers; i++ {
go a.consumer(a.Config.PurgeTimeInMillisecond)
}
}// consumer
func (a *App) consumer(purge int) {
if purge == 0 {
purge = 30000 // 30000 milliseconds = 30 seconds
}
a.wg.Add(1)
batches := make([][]byte, 0, a.Config.BatchLimit)
flushTicker := time.NewTicker(time.Duration(purge) * time.Millisecond)for loop := true; loop; {
select {
// flush batches every 30s just in case
case <-flushTicker.C:
if len(batches) > 0 {
ctx := context.Background()
// batches put onto firehose
a.PutRequest(ctx, batches)
// clear batch
batches = nil
batches = make([][]byte, 0, a.Config.BatchLimit)
}
case wrk, ok := <-a.workChan:
if !ok {
loop = false
break
}
batches = append(batches, wrk...)
// we have reached the batch limit, BulkPut onto Kinesis
if len(batches) == a.Config.BatchLimit {
ctx := context.Background()
// batches put onto firehose
a.PutRequest(ctx, batches)
// clear batch
batches = nil
batches = make([][]byte, 0, a.Config.BatchLimit)
}
}}if len(batches) > 0 {
ctx := context.Background()
// batches put onto firehose
a.PutRequest(ctx, batches)
}a.wg.Done()
}// Put - put post request onto processing channel
func (a *App) Put(ct [][]byte) {
a.workChan <- ct
}// postConversionTrackingHandler gets POST conversion tracking request and places on kinesis firehose
func (a *App) postRequestHandler(w http.ResponseWriter, r *http.Request) {
var data []post.Requesterr := requests.DecodeJSONBody(w, r, &data, 1024*1024*3) // 3MB limit, move to config
if err != nil {
var mr *requests.MalformedRequest
if errors.As(err, &mr) {
StatusBadRequest(w, mr.Error())
} else {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
}
return
}// validate
put, err := Validate(data)
if nil != err {
StatusBadRequest(w, err.Error())
return
}a.Put(put)w.WriteHeader(http.StatusAccepted)
}// Validate and append additional information
// If invalid return error and stop
func Validate(data []post.Request) ([][]byte, error) {
ret := make([][]byte, 0, len(data))for _, ct := range data {
if ok, errs := ct.Validate(); !ok {
err := errs[0]
return ret, err
}json, err := json.Marshal(ct)
if nil != err {
log.Printf("Json Marshal Err: %s", err.Error())
continue
}ret = append(ret, json)
}return ret, nil
}// StatusBadRequest - set header, set status as 400, return error code
func StatusBadRequest(w http.ResponseWriter, s string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(RequestError{ErrorCodes: []string{s}})
}// PutConversionTracking puts conversion
func (a *App) PutRequest(ctx context.Context, data [][]byte) (err error) {
// retry attempt kinesis firehose put if it fails
if len(data) > 0 {
attempts, delay := uint(a.Config.Retry.Attempts), time.Second*time.Duration(a.Config.Retry.DelayInSeconds)
err = retry.Do(
func() error {
_, err := a.Producer.Put(ctx, data, a.Config.StreamName)
if nil != err {
return err
}
return nil
},
retry.OnRetry(func(n uint, err error) {
log.Printf("Attempt: %d, Error: %s, Retrying...", n, err.Error())
}),
retry.Delay(delay),
retry.Attempts(attempts),
retry.Context(ctx),
)
}return err
}
And finally start.go
package appimport (
"context"
internalconfig "json-to-parquet/internal/config"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"time""json-to-parquet/internal/connections"
firehose "json-to-parquet/internal/connections/firehose"producer "json-to-parquet/internal/producer/firehose""github.com/aws/aws-sdk-go-v2/config"
)// Start initializes and runs the webserver
// Copied from https://github.com/gorilla/mux#graceful-shutdown
func Start() {
var wait time.Duration
var configFilePath string
flag.DurationVar(&wait, "graceful-timeout", time.Second*15, "the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m")
flag.StringVar(&configFilePath, "config-filepath", "../../config/config.json", "the file path for the config file")
flag.Parse()// attempt to load config from config file path
cfg, err := internalconfig.GetConfig(configFilePath)
if err != nil {
log.Fatal(err)
}ctx := context.Background()firehoseClient, err := firehose.New(ctx, &connections.AwsConfigLoader{}, config.WithRegion(cfg.Region))
if nil != err {
log.Fatalln(err)
}application := New(
cfg,
&producer.Producer{firehoseClient},
)// launch firehose consumers in goroutines, if do async
application.StartConsumers()srv := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", cfg.Port),
// Good practice to set timeouts to avoid Slowloris attacks.
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
IdleTimeout: time.Second * 60,
Handler: application.router(), // Pass our instance of gorilla/mux in.
}// Run our server in a goroutine so that it doesn't block.
go func() {
if err := srv.ListenAndServe(); err != nil {
log.Println(err)
}
}()c := make(chan os.Signal, 1)
// We'll accept graceful shutdowns when quit via SIGINT (Ctrl+C)
// SIGKILL, SIGQUIT or SIGTERM (Ctrl+/) will not be caught.
signal.Notify(c, os.Interrupt)// Block until we receive our signal.
<-c// Create a deadline to wait for.
ctx, cancel := context.WithTimeout(ctx, wait)
defer cancel()
// Doesn't block if no connections, but will otherwise wait
// until the timeout deadline.
srv.Shutdown(ctx)
// close worker channel
application.CloseChannel()
// Optionally, you could run srv.Shutdown in a goroutine and block on
// <-ctx.Done() if your application should wait for other services
// to finalize based on context cancellation.
log.Println("shutting down")
os.Exit(0)
}
The final part is creating a Glue schema and attaching it to the kinesis firehose during the creation process.
First create a bucket in S3. I named mine json-to-parquet . This bucket will be used to save the .parquet files
Next we will go to Amazon Glue and create our schema. This is pulled from the model we created in go. Remember our Request struct:
// Request model
type Request struct {
EventTypeId int `json:"event_type_id" validate:"required,numeric,oneof=0 1 2"`
UserId int64 `json:"user_id" validate:"numeric"`
Timestamp string `json:"timestamp"`
}Lets create a database in Glue
Press enter or click to view image in full size
We need to add a table to our database and define some properties in the table
Press enter or click to view image in full size
Press enter or click to view image in full size
Press enter or click to view image in full size
And finally lets define our schema from above. We will be using the schema from above that we defined in our go code and using user_id as a partion key
Press enter or click to view image in full size
Our final step is creating our new kinesis firehose and attaching our s3 bucket and glue tables. We will be using the Direct PUT from our handler code
Press enter or click to view image in full size
We have enabled record format conversion and attached our Glue table
Press enter or click to view image in full size
Finally we have setup our S3 destination and prefixes. Compression is automatically enabled
Press enter or click to view image in full size
Now that we have completed our infastructure all thats left is deploying our go API and send some test events.
We now having a single API end point: /v1/event to accept and validate requests, marshal the requests into a consumable format and populate a kinesis firehose stream.
Glue will automatically convert our JSON payload to the parquet format in bulk and store in S3.
Our data team can then run Athena queries on this data.
Thats it, there is more to this and mainly focuses on deploying the golang code and working with Amazon IAM, roles, and policies to put everything together
Hope you liked this Post, please comment below