Amazon SQS #

Amazon Simple Queue Service (SQS) adalah layanan message queue terkelola sepenuhnya oleh AWS — tidak ada server yang perlu dikelola, tidak ada cluster yang dikonfigurasi. SQS sangat cocok untuk decoupling komponen di arsitektur cloud, memungkinkan setiap komponen berskala secara independen. AWS SDK untuk Go v2 (github.com/aws/aws-sdk-go-v2) adalah cara resmi mengakses SQS dari aplikasi Go.

Jenis Queue SQS #

Standard Queue:
  - Throughput hampir tak terbatas
  - At-least-once delivery (bisa duplikat)
  - Urutan pesan tidak dijamin (best-effort)
  - Cocok untuk: sebagian besar use case

FIFO Queue (nama harus berakhir .fifo):
  - Throughput dibatasi (3000 TPS dengan batching)
  - Exactly-once delivery (deduplikasi otomatis)
  - Urutan pesan dijamin per Message Group ID
  - Cocok untuk: e-commerce orders, financial transactions

Instalasi #

go get github.com/aws/aws-sdk-go-v2
go get github.com/aws/aws-sdk-go-v2/config
go get github.com/aws/aws-sdk-go-v2/service/sqs

Setup Client SQS #

import (
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
    "github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

func newSQSClient(ctx context.Context) (*sqs.Client, error) {
    // Load config dari environment, ~/.aws/credentials, atau IAM role
    cfg, err := config.LoadDefaultConfig(ctx,
        config.WithRegion("ap-southeast-1"),
    )
    if err != nil {
        return nil, fmt.Errorf("load config: %w", err)
    }

    return sqs.NewFromConfig(cfg), nil
}

// Untuk development/testing dengan LocalStack
func newLocalSQSClient(ctx context.Context) (*sqs.Client, error) {
    cfg, err := config.LoadDefaultConfig(ctx,
        config.WithRegion("us-east-1"),
        config.WithCredentialsProvider(
            credentials.NewStaticCredentialsProvider("test", "test", ""),
        ),
    )
    if err != nil {
        return nil, err
    }

    return sqs.NewFromConfig(cfg,
        func(o *sqs.Options) {
            o.BaseEndpoint = aws.String("http://localhost:4566") // LocalStack
        },
    ), nil
}

Operasi Queue #

// Dapatkan URL queue (diperlukan untuk semua operasi)
func getQueueURL(ctx context.Context, client *sqs.Client, name string) (string, error) {
    result, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
        QueueName: aws.String(name),
    })
    if err != nil {
        return "", fmt.Errorf("get queue url: %w", err)
    }
    return *result.QueueUrl, nil
}

// Buat queue baru
func createQueue(ctx context.Context, client *sqs.Client, name string) (string, error) {
    result, err := client.CreateQueue(ctx, &sqs.CreateQueueInput{
        QueueName: aws.String(name),
        Attributes: map[string]string{
            "VisibilityTimeout":             "30",    // detik
            "MessageRetentionPeriod":        "86400", // 1 hari
            "ReceiveMessageWaitTimeSeconds": "20",    // long polling
            // Dead letter queue
            "RedrivePolicy": `{"deadLetterTargetArn":"arn:aws:sqs:ap-southeast-1:123456789:queue-dlq","maxReceiveCount":"3"}`,
        },
    })
    if err != nil {
        return "", err
    }
    return *result.QueueUrl, nil
}

// Buat FIFO queue
func createFIFOQueue(ctx context.Context, client *sqs.Client, name string) (string, error) {
    result, err := client.CreateQueue(ctx, &sqs.CreateQueueInput{
        QueueName: aws.String(name + ".fifo"), // nama harus berakhir .fifo
        Attributes: map[string]string{
            "FifoQueue":                     "true",
            "ContentBasedDeduplication":     "true", // dedup otomatis berdasarkan body
            "VisibilityTimeout":             "30",
            "MessageRetentionPeriod":        "86400",
            "ReceiveMessageWaitTimeSeconds": "20",
        },
    })
    if err != nil {
        return "", err
    }
    return *result.QueueUrl, nil
}

Send — Mengirim Pesan #

// Kirim satu pesan
func sendMessage(ctx context.Context, client *sqs.Client, queueURL string, body interface{}) error {
    data, err := json.Marshal(body)
    if err != nil {
        return err
    }

    _, err = client.SendMessage(ctx, &sqs.SendMessageInput{
        QueueUrl:    aws.String(queueURL),
        MessageBody: aws.String(string(data)),
        // Delay pengiriman (0-900 detik)
        DelaySeconds: 0,
        // Message attributes untuk metadata
        MessageAttributes: map[string]types.MessageAttributeValue{
            "ContentType": {
                DataType:    aws.String("String"),
                StringValue: aws.String("application/json"),
            },
            "Source": {
                DataType:    aws.String("String"),
                StringValue: aws.String("order-service"),
            },
        },
    })
    return err
}

// Kirim ke FIFO queue
func sendFIFOMessage(ctx context.Context, client *sqs.Client, queueURL string,
    groupID, deduplicationID string, body interface{}) error {

    data, _ := json.Marshal(body)
    _, err := client.SendMessage(ctx, &sqs.SendMessageInput{
        QueueUrl:               aws.String(queueURL),
        MessageBody:            aws.String(string(data)),
        MessageGroupId:         aws.String(groupID),        // urutan dijamin dalam group
        MessageDeduplicationId: aws.String(deduplicationID), // cegah duplikat
    })
    return err
}

// Batch send — hingga 10 pesan sekaligus (lebih hemat biaya)
func sendBatch(ctx context.Context, client *sqs.Client, queueURL string, messages []interface{}) error {
    entries := make([]types.SendMessageBatchRequestEntry, 0, len(messages))
    for i, msg := range messages {
        data, _ := json.Marshal(msg)
        entries = append(entries, types.SendMessageBatchRequestEntry{
            Id:          aws.String(fmt.Sprintf("msg-%d", i)),
            MessageBody: aws.String(string(data)),
        })
    }

    result, err := client.SendMessageBatch(ctx, &sqs.SendMessageBatchInput{
        QueueUrl: aws.String(queueURL),
        Entries:  entries,
    })
    if err != nil {
        return err
    }

    if len(result.Failed) > 0 {
        log.Printf("%d pesan gagal dikirim", len(result.Failed))
        for _, f := range result.Failed {
            log.Printf("  Gagal ID=%s: %s", *f.Id, *f.Message)
        }
    }
    return nil
}

Receive — Menerima dan Memproses Pesan #

// Receive dengan long polling
func receiveMessages(ctx context.Context, client *sqs.Client, queueURL string,
    handler func(msg types.Message) error) error {

    for {
        result, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
            QueueUrl:            aws.String(queueURL),
            MaxNumberOfMessages: 10,  // 1-10 pesan per request
            WaitTimeSeconds:     20,  // long polling — tunggu hingga 20 detik
            VisibilityTimeout:   30,  // sembunyikan dari consumer lain 30 detik
            MessageAttributeNames: []string{"All"},
            AttributeNames:        []types.QueueAttributeName{"All"},
        })
        if err != nil {
            if ctx.Err() != nil {
                return nil // context dibatalkan, bukan error
            }
            return fmt.Errorf("receive: %w", err)
        }

        for _, msg := range result.Messages {
            if err := handler(msg); err != nil {
                log.Printf("Error proses pesan %s: %v", *msg.MessageId, err)
                // Pesan otomatis kembali visible setelah VisibilityTimeout
                // atau perpanjang waktu jika butuh lebih lama
                continue
            }

            // Hapus pesan setelah berhasil diproses
            if _, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
                QueueUrl:      aws.String(queueURL),
                ReceiptHandle: msg.ReceiptHandle,
            }); err != nil {
                log.Printf("Gagal hapus pesan: %v", err)
            }
        }
    }
}

// Perpanjang visibility timeout saat proses memerlukan waktu lebih
func extendVisibility(ctx context.Context, client *sqs.Client, queueURL string,
    receiptHandle *string, newTimeout int32) error {

    _, err := client.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{
        QueueUrl:          aws.String(queueURL),
        ReceiptHandle:     receiptHandle,
        VisibilityTimeout: newTimeout,
    })
    return err
}

// Batch delete — hapus banyak pesan sekaligus
func deleteBatch(ctx context.Context, client *sqs.Client, queueURL string,
    messages []types.Message) error {

    entries := make([]types.DeleteMessageBatchRequestEntry, len(messages))
    for i, msg := range messages {
        entries[i] = types.DeleteMessageBatchRequestEntry{
            Id:            aws.String(fmt.Sprintf("del-%d", i)),
            ReceiptHandle: msg.ReceiptHandle,
        }
    }

    result, err := client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
        QueueUrl: aws.String(queueURL),
        Entries:  entries,
    })
    if err != nil {
        return err
    }

    if len(result.Failed) > 0 {
        log.Printf("%d pesan gagal dihapus", len(result.Failed))
    }
    return nil
}

Contoh Program Lengkap — Job Queue #

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os/signal"
    "syscall"
    "time"
    "os"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
    "github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

type Job struct {
    ID       string    `json:"id"`
    Type     string    `json:"type"`
    Payload  string    `json:"payload"`
    Priority int       `json:"priority"`
    Created  time.Time `json:"created"`
}

type JobResult struct {
    JobID     string        `json:"job_id"`
    Success   bool          `json:"success"`
    Output    string        `json:"output,omitempty"`
    Error     string        `json:"error,omitempty"`
    Duration  time.Duration `json:"duration"`
    ProcessedAt time.Time   `json:"processed_at"`
}

// JobQueue wrapper untuk SQS
type JobQueue struct {
    client   *sqs.Client
    queueURL string
}

func NewJobQueue(ctx context.Context, queueURL string) (*JobQueue, error) {
    cfg, err := config.LoadDefaultConfig(ctx,
        config.WithRegion("ap-southeast-1"),
    )
    if err != nil {
        return nil, err
    }

    return &JobQueue{
        client:   sqs.NewFromConfig(cfg),
        queueURL: queueURL,
    }, nil
}

func (q *JobQueue) Enqueue(ctx context.Context, job Job) error {
    data, err := json.Marshal(job)
    if err != nil {
        return err
    }

    _, err = q.client.SendMessage(ctx, &sqs.SendMessageInput{
        QueueUrl:    aws.String(q.queueURL),
        MessageBody: aws.String(string(data)),
        MessageAttributes: map[string]types.MessageAttributeValue{
            "JobType": {
                DataType:    aws.String("String"),
                StringValue: aws.String(job.Type),
            },
            "Priority": {
                DataType:    aws.String("Number"),
                StringValue: aws.String(fmt.Sprintf("%d", job.Priority)),
            },
        },
    })
    if err != nil {
        return fmt.Errorf("enqueue job %s: %w", job.ID, err)
    }

    log.Printf("[QUEUE] Job %s (%s) dikirim", job.ID, job.Type)
    return nil
}

func (q *JobQueue) Process(ctx context.Context, handler func(Job) (string, error)) {
    log.Println("[WORKER] Mulai memproses job...")

    for {
        select {
        case <-ctx.Done():
            log.Println("[WORKER] Berhenti")
            return
        default:
        }

        result, err := q.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
            QueueUrl:              aws.String(q.queueURL),
            MaxNumberOfMessages:   5,
            WaitTimeSeconds:       20,
            VisibilityTimeout:     60,
            MessageAttributeNames: []string{"All"},
        })
        if err != nil {
            if ctx.Err() != nil {
                return
            }
            log.Printf("[WORKER] Receive error: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }

        for _, msg := range result.Messages {
            var job Job
            if err := json.Unmarshal([]byte(*msg.Body), &job); err != nil {
                log.Printf("[WORKER] Error unmarshal: %v", err)
                q.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
                    QueueUrl:      aws.String(q.queueURL),
                    ReceiptHandle: msg.ReceiptHandle,
                })
                continue
            }

            start := time.Now()
            log.Printf("[WORKER] Proses job %s (%s)", job.ID, job.Type)

            output, err := handler(job)
            duration := time.Since(start)

            if err != nil {
                log.Printf("[WORKER] Job %s GAGAL (%v): %v", job.ID, duration, err)
                // Biarkan pesan kembali ke queue (tidak delete)
                // Setelah maxReceiveCount kali, masuk DLQ
                continue
            }

            log.Printf("[WORKER] Job %s SELESAI (%v): %s", job.ID, duration, output)

            // Hapus pesan setelah berhasil
            q.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
                QueueUrl:      aws.String(q.queueURL),
                ReceiptHandle: msg.ReceiptHandle,
            })
        }
    }
}

func processJob(job Job) (string, error) {
    // Simulasi berbagai jenis job
    switch job.Type {
    case "send_email":
        time.Sleep(100 * time.Millisecond)
        return fmt.Sprintf("Email terkirim untuk payload: %s", job.Payload), nil

    case "resize_image":
        time.Sleep(500 * time.Millisecond)
        return fmt.Sprintf("Gambar %s berhasil di-resize", job.Payload), nil

    case "generate_report":
        time.Sleep(2 * time.Second)
        return fmt.Sprintf("Laporan %s berhasil dibuat", job.Payload), nil

    case "sync_data":
        time.Sleep(300 * time.Millisecond)
        return fmt.Sprintf("Data %s berhasil disinkronkan", job.Payload), nil

    default:
        return "", fmt.Errorf("jenis job tidak dikenal: %s", job.Type)
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // Tangkap sinyal untuk graceful shutdown
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigCh
        log.Println("Menerima sinyal, shutdown...")
        cancel()
    }()

    // Dalam production gunakan URL queue yang sebenarnya dari AWS
    queueURL := "https://sqs.ap-southeast-1.amazonaws.com/123456789/job-queue"

    // Untuk LocalStack development:
    // queueURL := "http://localhost:4566/000000000000/job-queue"

    queue, err := NewJobQueue(ctx, queueURL)
    if err != nil {
        log.Fatal("Buat job queue:", err)
    }

    // Kirim beberapa job
    fmt.Println("=== Mengirim Job ===")
    jobs := []Job{
        {ID: "job-001", Type: "send_email", Payload: "[email protected]", Priority: 1, Created: time.Now()},
        {ID: "job-002", Type: "resize_image", Payload: "photo-123.jpg", Priority: 2, Created: time.Now()},
        {ID: "job-003", Type: "generate_report", Payload: "monthly-Q4", Priority: 3, Created: time.Now()},
        {ID: "job-004", Type: "sync_data", Payload: "users-table", Priority: 1, Created: time.Now()},
        {ID: "job-005", Type: "send_email", Payload: "[email protected]", Priority: 1, Created: time.Now()},
    }

    for _, job := range jobs {
        if err := queue.Enqueue(ctx, job); err != nil {
            log.Printf("Gagal enqueue: %v", err)
        }
    }

    // Proses job
    fmt.Println("\n=== Memproses Job ===")
    queue.Process(ctx, processJob)
}

Ringkasan #

  • Standard Queue untuk throughput tinggi; FIFO Queue (nama berakhir .fifo) untuk urutan yang dijamin.
  • Long polling (WaitTimeSeconds: 20) mengurangi biaya dan latensi vs short polling — selalu gunakan ini.
  • Visibility timeout menyembunyikan pesan dari consumer lain saat diproses; perpanjang dengan ChangeMessageVisibility untuk proses lama.
  • Hapus pesan dengan DeleteMessage setelah berhasil diproses — at-least-once delivery.
  • Dead Letter Queue untuk pesan yang gagal setelah maxReceiveCount kali — konfigurasi via RedrivePolicy.
  • Batch send (SendMessageBatch) dan batch delete (DeleteMessageBatch) untuk mengurangi biaya API call.
  • Message attributes untuk metadata tanpa harus parse body — berguna untuk routing di consumer.
  • FIFO deduplication: ContentBasedDeduplication (SHA-256 body) atau MessageDeduplicationId eksplisit.
  • MessageGroupId di FIFO queue untuk menjamin urutan dalam satu group (misalnya per OrderID).
  • IAM role adalah cara terbaik untuk autentikasi di EC2/ECS/Lambda — tidak perlu hard-code credentials.

← Sebelumnya: RabbitMQ   Berikutnya: Google Pub/Sub →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact