go-sqs-worker
go-sqs-worker is a Go library designed to manage asynchronous jobs using AWS SQS. It provides a simple, scalable way
to produce and consume tasks, making it ideal for background job processing.
Features
- Producer and Consumer Support: Enables the creation and handling of jobs with distinct producer and consumer components.
- Scheduled Job Processing: Supports the scheduling of jobs for processing at a later time.
- Dead Letter Queue Integration: Provides built-in support for Dead Letter Queues (DLQs) to ensure reliable message processing.
- Configurable Retry Mechanism: Allows customization of the number of retries and delay intervals for failed jobs.
- Graceful Shutdown: Manages context cancellation and cleanup during termination.
- LocalStack Compatibility: Fully compatible with LocalStack for local development and testing.
Installation
Add the library to your project using go get:
go get github.com/mickamy/go-sqs-worker
Usage
Producer Example
The producer is responsible for sending jobs to the SQS queue.
package main import ( "context" "fmt" "github.com/mickamy/go-sqs-worker/producer" "github.com/mickamy/go-sqs-worker/message" "github.com/mickamy/go-sqs-worker-example/internal/job" "github.com/mickamy/go-sqs-worker-example/internal/lib/aws" ) func main() { ctx := context.Background() cfg := producer.Config{ WorkerQueueURL: "http://localhost.localstack.cloud:4566/000000000000/worker-queue", } p, err := producer.New(cfg, aws.NewSQSClient(ctx)) if err != nil { fmt.Println("failed to create producer", "error", err) return } msg, err := message.New(ctx, job.SuccessfulJobType.String(), job.SuccessfulJobPayload{ Message: "hello", }) if err != nil { fmt.Println("failed to create successful job message", "error", err) return } if err := p.Do(ctx, msg); err != nil { fmt.Println("failed to produce successful job", "error", err) return } }
Consumer Example
The consumer retrieves and processes jobs from the SQS queue.
package main import ( "context" "fmt" "github.com/mickamy/go-sqs-worker-example/internal/lib/aws" "github.com/mickamy/go-sqs-worker/consumer" ) func main() { ctx := context.Background() cfg := consumer.Config{ WorkerQueueURL: "http://localhost.localstack.cloud:4566/000000000000/worker-queue", WorkerQueueARN: "arn:aws:sqs:ap-northeast-1:000000000000:worker-queue", SchedulerRoleARN: "arn:aws:iam::000000000000:role/scheduler-role", SchedulerTimeZone: "UTC", DeadLetterQueueURL: "http://localhost.localstack.cloud:4566/000000000000/dead-letter-queue", } c, err := consumer.New(cfg, aws.NewSQSClient(ctx), aws.NewSchedulerClient(ctx), job.GetJobHandler) if err != nil { fmt.Println("failed to create consumer", "error", err) return } c.Do(ctx) }
Example Project
The repository includes an example project under the example/ directory. To run the example:
Integration with go-sqs-worker-viewer
For monitoring and managing jobs in real-time, use go-sqs-worker-viewer. Configure RedisURL in the consumer/producer to enable integration. Redis-compatible servers such as Redis or Valkey are supported:
consumer.Config{
// ... other configurations
RedisURL: "redis://localhost:6379",
}
producer.Config{
// ... other configurations
RedisURL: "redis://localhost:6379",
}
This setup allows go-sqs-worker-viewer to display job statuses and processing metrics.
If you prefer configuration via environment variables, set REDIS_URL to the connection string of your Redis or Valkey
instance.
Configuration
Producer Configuration
- WorkerQueueURL: Required. The URL of the SQS queue where worker messages are stored.
- WorkerQueueARN: Optional. The ARN of the SQS queue where worker messages are stored. This is required if you use scheduled jobs.
- SchedulerRoleARN: Optional. The ARN of the IAM role used by EventBridge Scheduler. This is required if you use scheduled jobs.
- SchedulerTimeZone: Optional. The time zone used by EventBridge Scheduler. This is required if you use scheduled jobs.
- BeforeProduceFunc: Optional. A function to execute before a message is produced. The default function does nothing.
- AfterProduceFunc: Optional. A function to execute after a message is produced. The default function does nothing.
- RedisURL: Optional. The URL of the Redis-compatible server (e.g., Redis, Valkey) for storing job processing status. This is particularly useful for tracking job-related data during processing, especially when used with go-sqs-worker-viewer.
Consumer Configuration
- WorkerQueueURL: Required. The URL of the SQS queue where worker messages are stored.
- WorkerQueueARN: Optional. The ARN of the SQS queue where worker messages are stored. This is required when enabling delayed retries via EventBridge Scheduler.
- SchedulerRoleARN: Optional. The ARN of the IAM role used by EventBridge Scheduler. This is required when enabling delayed retries via EventBridge Scheduler.
- SchedulerTimeZone: Optional. The time zone used by EventBridge Scheduler. This is required when enabling delayed retries via EventBridge Scheduler.
- DeadLetterQueueURL: Optional. The URL of the Dead Letter Queue (DLQ) for messages that fail to process after the maximum number of retries. If not set, the DLQ is not used.
- RedisURL: Optional. The URL of the Redis-compatible server (e.g., Redis, Valkey) for storing job processing status. This is particularly useful for tracking job-related data during processing, especially when used with go-sqs-worker-viewer.
- MaxRetry: Optional. The maximum number of retry attempts for a failed job. The default value is 5 retries.
- BaseDelay: Optional. The initial delay (in seconds) before retrying a failed job. This value is used as the base for calculating exponential backoff delays. The default value is 30 seconds.
- MaxDelay: Optional. The maximum delay (in seconds) between retries, used to cap the exponential backoff delay. The default value is 900 seconds (which is the maximum visibility timeout for SQS).
- WaitTimeSeconds: Optional. The maximum time (in seconds) to wait for a message to be received from the SQS queue. This value is used for long polling. The default value is 20 seconds. The maximum allowed value is 20 seconds.
- TimeoutSeconds: Optional. The maximum time (in seconds) to wait for a message to be processed. This value is used to limit the time spent on processing a message. The default value is 300 seconds. Note that actual timeout seconds will be WaitTimeSeconds + TimeoutSeconds.
- BeforeProcessFunc: Optional. A function to execute before processing a message. The default function does nothing.
- AfterProcessFunc: Optional. A function to execute after processing a message. The default function does nothing.
Documentation
GoDoc documentation is available at pkg.go.dev.
License
This library is licensed under the MIT License. See the LICENSE file for details.