Amazon SQS #
Amazon Simple Queue Service (SQS) adalah layanan message queue terkelola sepenuhnya oleh AWS — tidak ada server yang perlu dikelola, tidak ada cluster yang dikonfigurasi. SQS sangat cocok untuk decoupling komponen di arsitektur cloud, memungkinkan setiap komponen berskala secara independen. AWS SDK untuk Go v2 (github.com/aws/aws-sdk-go-v2) adalah cara resmi mengakses SQS dari aplikasi Go.
Jenis Queue SQS #
Standard Queue:
- Throughput hampir tak terbatas
- At-least-once delivery (bisa duplikat)
- Urutan pesan tidak dijamin (best-effort)
- Cocok untuk: sebagian besar use case
FIFO Queue (nama harus berakhir .fifo):
- Throughput dibatasi (3000 TPS dengan batching)
- Exactly-once delivery (deduplikasi otomatis)
- Urutan pesan dijamin per Message Group ID
- Cocok untuk: e-commerce orders, financial transactions
Instalasi #
go get github.com/aws/aws-sdk-go-v2
go get github.com/aws/aws-sdk-go-v2/config
go get github.com/aws/aws-sdk-go-v2/service/sqs
Setup Client SQS #
import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
func newSQSClient(ctx context.Context) (*sqs.Client, error) {
// Load config dari environment, ~/.aws/credentials, atau IAM role
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion("ap-southeast-1"),
)
if err != nil {
return nil, fmt.Errorf("load config: %w", err)
}
return sqs.NewFromConfig(cfg), nil
}
// Untuk development/testing dengan LocalStack
func newLocalSQSClient(ctx context.Context) (*sqs.Client, error) {
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion("us-east-1"),
config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider("test", "test", ""),
),
)
if err != nil {
return nil, err
}
return sqs.NewFromConfig(cfg,
func(o *sqs.Options) {
o.BaseEndpoint = aws.String("http://localhost:4566") // LocalStack
},
), nil
}
Operasi Queue #
// Dapatkan URL queue (diperlukan untuk semua operasi)
func getQueueURL(ctx context.Context, client *sqs.Client, name string) (string, error) {
result, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(name),
})
if err != nil {
return "", fmt.Errorf("get queue url: %w", err)
}
return *result.QueueUrl, nil
}
// Buat queue baru
func createQueue(ctx context.Context, client *sqs.Client, name string) (string, error) {
result, err := client.CreateQueue(ctx, &sqs.CreateQueueInput{
QueueName: aws.String(name),
Attributes: map[string]string{
"VisibilityTimeout": "30", // detik
"MessageRetentionPeriod": "86400", // 1 hari
"ReceiveMessageWaitTimeSeconds": "20", // long polling
// Dead letter queue
"RedrivePolicy": `{"deadLetterTargetArn":"arn:aws:sqs:ap-southeast-1:123456789:queue-dlq","maxReceiveCount":"3"}`,
},
})
if err != nil {
return "", err
}
return *result.QueueUrl, nil
}
// Buat FIFO queue
func createFIFOQueue(ctx context.Context, client *sqs.Client, name string) (string, error) {
result, err := client.CreateQueue(ctx, &sqs.CreateQueueInput{
QueueName: aws.String(name + ".fifo"), // nama harus berakhir .fifo
Attributes: map[string]string{
"FifoQueue": "true",
"ContentBasedDeduplication": "true", // dedup otomatis berdasarkan body
"VisibilityTimeout": "30",
"MessageRetentionPeriod": "86400",
"ReceiveMessageWaitTimeSeconds": "20",
},
})
if err != nil {
return "", err
}
return *result.QueueUrl, nil
}
Send — Mengirim Pesan #
// Kirim satu pesan
func sendMessage(ctx context.Context, client *sqs.Client, queueURL string, body interface{}) error {
data, err := json.Marshal(body)
if err != nil {
return err
}
_, err = client.SendMessage(ctx, &sqs.SendMessageInput{
QueueUrl: aws.String(queueURL),
MessageBody: aws.String(string(data)),
// Delay pengiriman (0-900 detik)
DelaySeconds: 0,
// Message attributes untuk metadata
MessageAttributes: map[string]types.MessageAttributeValue{
"ContentType": {
DataType: aws.String("String"),
StringValue: aws.String("application/json"),
},
"Source": {
DataType: aws.String("String"),
StringValue: aws.String("order-service"),
},
},
})
return err
}
// Kirim ke FIFO queue
func sendFIFOMessage(ctx context.Context, client *sqs.Client, queueURL string,
groupID, deduplicationID string, body interface{}) error {
data, _ := json.Marshal(body)
_, err := client.SendMessage(ctx, &sqs.SendMessageInput{
QueueUrl: aws.String(queueURL),
MessageBody: aws.String(string(data)),
MessageGroupId: aws.String(groupID), // urutan dijamin dalam group
MessageDeduplicationId: aws.String(deduplicationID), // cegah duplikat
})
return err
}
// Batch send — hingga 10 pesan sekaligus (lebih hemat biaya)
func sendBatch(ctx context.Context, client *sqs.Client, queueURL string, messages []interface{}) error {
entries := make([]types.SendMessageBatchRequestEntry, 0, len(messages))
for i, msg := range messages {
data, _ := json.Marshal(msg)
entries = append(entries, types.SendMessageBatchRequestEntry{
Id: aws.String(fmt.Sprintf("msg-%d", i)),
MessageBody: aws.String(string(data)),
})
}
result, err := client.SendMessageBatch(ctx, &sqs.SendMessageBatchInput{
QueueUrl: aws.String(queueURL),
Entries: entries,
})
if err != nil {
return err
}
if len(result.Failed) > 0 {
log.Printf("%d pesan gagal dikirim", len(result.Failed))
for _, f := range result.Failed {
log.Printf(" Gagal ID=%s: %s", *f.Id, *f.Message)
}
}
return nil
}
Receive — Menerima dan Memproses Pesan #
// Receive dengan long polling
func receiveMessages(ctx context.Context, client *sqs.Client, queueURL string,
handler func(msg types.Message) error) error {
for {
result, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 10, // 1-10 pesan per request
WaitTimeSeconds: 20, // long polling — tunggu hingga 20 detik
VisibilityTimeout: 30, // sembunyikan dari consumer lain 30 detik
MessageAttributeNames: []string{"All"},
AttributeNames: []types.QueueAttributeName{"All"},
})
if err != nil {
if ctx.Err() != nil {
return nil // context dibatalkan, bukan error
}
return fmt.Errorf("receive: %w", err)
}
for _, msg := range result.Messages {
if err := handler(msg); err != nil {
log.Printf("Error proses pesan %s: %v", *msg.MessageId, err)
// Pesan otomatis kembali visible setelah VisibilityTimeout
// atau perpanjang waktu jika butuh lebih lama
continue
}
// Hapus pesan setelah berhasil diproses
if _, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle,
}); err != nil {
log.Printf("Gagal hapus pesan: %v", err)
}
}
}
}
// Perpanjang visibility timeout saat proses memerlukan waktu lebih
func extendVisibility(ctx context.Context, client *sqs.Client, queueURL string,
receiptHandle *string, newTimeout int32) error {
_, err := client.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: receiptHandle,
VisibilityTimeout: newTimeout,
})
return err
}
// Batch delete — hapus banyak pesan sekaligus
func deleteBatch(ctx context.Context, client *sqs.Client, queueURL string,
messages []types.Message) error {
entries := make([]types.DeleteMessageBatchRequestEntry, len(messages))
for i, msg := range messages {
entries[i] = types.DeleteMessageBatchRequestEntry{
Id: aws.String(fmt.Sprintf("del-%d", i)),
ReceiptHandle: msg.ReceiptHandle,
}
}
result, err := client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
QueueUrl: aws.String(queueURL),
Entries: entries,
})
if err != nil {
return err
}
if len(result.Failed) > 0 {
log.Printf("%d pesan gagal dihapus", len(result.Failed))
}
return nil
}
Contoh Program Lengkap — Job Queue #
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os/signal"
"syscall"
"time"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Payload string `json:"payload"`
Priority int `json:"priority"`
Created time.Time `json:"created"`
}
type JobResult struct {
JobID string `json:"job_id"`
Success bool `json:"success"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
ProcessedAt time.Time `json:"processed_at"`
}
// JobQueue wrapper untuk SQS
type JobQueue struct {
client *sqs.Client
queueURL string
}
func NewJobQueue(ctx context.Context, queueURL string) (*JobQueue, error) {
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion("ap-southeast-1"),
)
if err != nil {
return nil, err
}
return &JobQueue{
client: sqs.NewFromConfig(cfg),
queueURL: queueURL,
}, nil
}
func (q *JobQueue) Enqueue(ctx context.Context, job Job) error {
data, err := json.Marshal(job)
if err != nil {
return err
}
_, err = q.client.SendMessage(ctx, &sqs.SendMessageInput{
QueueUrl: aws.String(q.queueURL),
MessageBody: aws.String(string(data)),
MessageAttributes: map[string]types.MessageAttributeValue{
"JobType": {
DataType: aws.String("String"),
StringValue: aws.String(job.Type),
},
"Priority": {
DataType: aws.String("Number"),
StringValue: aws.String(fmt.Sprintf("%d", job.Priority)),
},
},
})
if err != nil {
return fmt.Errorf("enqueue job %s: %w", job.ID, err)
}
log.Printf("[QUEUE] Job %s (%s) dikirim", job.ID, job.Type)
return nil
}
func (q *JobQueue) Process(ctx context.Context, handler func(Job) (string, error)) {
log.Println("[WORKER] Mulai memproses job...")
for {
select {
case <-ctx.Done():
log.Println("[WORKER] Berhenti")
return
default:
}
result, err := q.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(q.queueURL),
MaxNumberOfMessages: 5,
WaitTimeSeconds: 20,
VisibilityTimeout: 60,
MessageAttributeNames: []string{"All"},
})
if err != nil {
if ctx.Err() != nil {
return
}
log.Printf("[WORKER] Receive error: %v", err)
time.Sleep(5 * time.Second)
continue
}
for _, msg := range result.Messages {
var job Job
if err := json.Unmarshal([]byte(*msg.Body), &job); err != nil {
log.Printf("[WORKER] Error unmarshal: %v", err)
q.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(q.queueURL),
ReceiptHandle: msg.ReceiptHandle,
})
continue
}
start := time.Now()
log.Printf("[WORKER] Proses job %s (%s)", job.ID, job.Type)
output, err := handler(job)
duration := time.Since(start)
if err != nil {
log.Printf("[WORKER] Job %s GAGAL (%v): %v", job.ID, duration, err)
// Biarkan pesan kembali ke queue (tidak delete)
// Setelah maxReceiveCount kali, masuk DLQ
continue
}
log.Printf("[WORKER] Job %s SELESAI (%v): %s", job.ID, duration, output)
// Hapus pesan setelah berhasil
q.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(q.queueURL),
ReceiptHandle: msg.ReceiptHandle,
})
}
}
}
func processJob(job Job) (string, error) {
// Simulasi berbagai jenis job
switch job.Type {
case "send_email":
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("Email terkirim untuk payload: %s", job.Payload), nil
case "resize_image":
time.Sleep(500 * time.Millisecond)
return fmt.Sprintf("Gambar %s berhasil di-resize", job.Payload), nil
case "generate_report":
time.Sleep(2 * time.Second)
return fmt.Sprintf("Laporan %s berhasil dibuat", job.Payload), nil
case "sync_data":
time.Sleep(300 * time.Millisecond)
return fmt.Sprintf("Data %s berhasil disinkronkan", job.Payload), nil
default:
return "", fmt.Errorf("jenis job tidak dikenal: %s", job.Type)
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
// Tangkap sinyal untuk graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("Menerima sinyal, shutdown...")
cancel()
}()
// Dalam production gunakan URL queue yang sebenarnya dari AWS
queueURL := "https://sqs.ap-southeast-1.amazonaws.com/123456789/job-queue"
// Untuk LocalStack development:
// queueURL := "http://localhost:4566/000000000000/job-queue"
queue, err := NewJobQueue(ctx, queueURL)
if err != nil {
log.Fatal("Buat job queue:", err)
}
// Kirim beberapa job
fmt.Println("=== Mengirim Job ===")
jobs := []Job{
{ID: "job-001", Type: "send_email", Payload: "[email protected]", Priority: 1, Created: time.Now()},
{ID: "job-002", Type: "resize_image", Payload: "photo-123.jpg", Priority: 2, Created: time.Now()},
{ID: "job-003", Type: "generate_report", Payload: "monthly-Q4", Priority: 3, Created: time.Now()},
{ID: "job-004", Type: "sync_data", Payload: "users-table", Priority: 1, Created: time.Now()},
{ID: "job-005", Type: "send_email", Payload: "[email protected]", Priority: 1, Created: time.Now()},
}
for _, job := range jobs {
if err := queue.Enqueue(ctx, job); err != nil {
log.Printf("Gagal enqueue: %v", err)
}
}
// Proses job
fmt.Println("\n=== Memproses Job ===")
queue.Process(ctx, processJob)
}
Ringkasan #
- Standard Queue untuk throughput tinggi; FIFO Queue (nama berakhir
.fifo) untuk urutan yang dijamin.- Long polling (
WaitTimeSeconds: 20) mengurangi biaya dan latensi vs short polling — selalu gunakan ini.- Visibility timeout menyembunyikan pesan dari consumer lain saat diproses; perpanjang dengan
ChangeMessageVisibilityuntuk proses lama.- Hapus pesan dengan
DeleteMessagesetelah berhasil diproses — at-least-once delivery.- Dead Letter Queue untuk pesan yang gagal setelah
maxReceiveCountkali — konfigurasi viaRedrivePolicy.- Batch send (
SendMessageBatch) dan batch delete (DeleteMessageBatch) untuk mengurangi biaya API call.- Message attributes untuk metadata tanpa harus parse body — berguna untuk routing di consumer.
- FIFO deduplication:
ContentBasedDeduplication(SHA-256 body) atauMessageDeduplicationIdeksplisit.MessageGroupIddi FIFO queue untuk menjamin urutan dalam satu group (misalnya per OrderID).- IAM role adalah cara terbaik untuk autentikasi di EC2/ECS/Lambda — tidak perlu hard-code credentials.