RabbitMQ #

RabbitMQ adalah message broker tradisional yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol). Berbeda dari Kafka yang menyimpan log secara permanen, RabbitMQ menghapus pesan setelah dikonsumsi — lebih cocok untuk task queue, job distribution, dan RPC. Kelebihannya: routing yang sangat fleksibel melalui sistem Exchange, dead letter queue bawaan, dan kemudahan setup. Go menggunakan library github.com/rabbitmq/amqp091-go.

Konsep Dasar RabbitMQ #

Producer   → mengirim pesan ke Exchange
Exchange   → menerima pesan dan routing ke Queue berdasarkan aturan
Binding    → aturan yang menghubungkan Exchange ke Queue
Queue      → antrian tempat pesan menunggu dikonsumsi
Consumer   → membaca pesan dari Queue

Exchange Types:
  direct  → routing berdasarkan routing key yang persis sama
  fanout  → broadcast ke semua queue yang terikat
  topic   → routing key dengan wildcard (* dan #)
  headers → routing berdasarkan message headers

Instalasi #

go get github.com/rabbitmq/amqp091-go

Koneksi dan Channel #

import amqp "github.com/rabbitmq/amqp091-go"

func connect(url string) (*amqp.Connection, *amqp.Channel, error) {
    // Format: amqp://user:password@host:port/vhost
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, nil, fmt.Errorf("dial: %w", err)
    }

    ch, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, nil, fmt.Errorf("channel: %w", err)
    }

    return conn, ch, nil
}

Koneksi Tahan Putus (Reconnection) #

type RabbitMQ struct {
    url    string
    conn   *amqp.Connection
    ch     *amqp.Channel
    mu     sync.Mutex
    closed bool
}

func NewRabbitMQ(url string) (*RabbitMQ, error) {
    r := &RabbitMQ{url: url}
    if err := r.connect(); err != nil {
        return nil, err
    }
    go r.watchReconnect()
    return r, nil
}

func (r *RabbitMQ) connect() error {
    conn, err := amqp.Dial(r.url)
    if err != nil {
        return err
    }
    ch, err := conn.Channel()
    if err != nil {
        conn.Close()
        return err
    }
    r.conn = conn
    r.ch = ch
    return nil
}

func (r *RabbitMQ) watchReconnect() {
    for !r.closed {
        reason, ok := <-r.conn.NotifyClose(make(chan *amqp.Error))
        if !ok || r.closed {
            break
        }
        log.Printf("Koneksi terputus: %v, mencoba reconnect...", reason)

        for {
            time.Sleep(5 * time.Second)
            if err := r.connect(); err != nil {
                log.Printf("Reconnect gagal: %v, coba lagi...", err)
                continue
            }
            log.Println("Reconnect berhasil!")
            break
        }
    }
}

func (r *RabbitMQ) Close() {
    r.closed = true
    r.ch.Close()
    r.conn.Close()
}

Exchange dan Queue Setup #

func setupDirectExchange(ch *amqp.Channel) error {
    // Deklarasi exchange
    err := ch.ExchangeDeclare(
        "orders",  // nama
        "direct",  // type: direct, fanout, topic, headers
        true,      // durable: tetap ada setelah restart
        false,     // auto-deleted
        false,     // internal
        false,     // no-wait
        nil,       // args
    )
    if err != nil {
        return fmt.Errorf("exchange declare: %w", err)
    }

    // Deklarasi queue
    q, err := ch.QueueDeclare(
        "order.processing",  // nama (kosong = generate random)
        true,                // durable
        false,               // delete when unused
        false,               // exclusive
        false,               // no-wait
        amqp.Table{
            "x-dead-letter-exchange":    "orders.dlx",   // dead letter exchange
            "x-dead-letter-routing-key": "order.failed", // routing key untuk DLQ
            "x-message-ttl":             int32(3600000), // TTL 1 jam (ms)
        },
    )
    if err != nil {
        return fmt.Errorf("queue declare: %w", err)
    }

    // Bind queue ke exchange dengan routing key
    return ch.QueueBind(
        q.Name,             // queue
        "order.processing", // routing key
        "orders",           // exchange
        false,
        nil,
    )
}

// Fanout exchange — broadcast ke semua queue
func setupFanoutExchange(ch *amqp.Channel) error {
    err := ch.ExchangeDeclare("notifications", "fanout", true, false, false, false, nil)
    if err != nil {
        return err
    }

    // Setiap service punya queue-nya sendiri
    for _, service := range []string{"email", "sms", "push"} {
        q, err := ch.QueueDeclare(
            "notification."+service, true, false, false, false, nil)
        if err != nil {
            return err
        }
        // Fanout tidak butuh routing key
        if err := ch.QueueBind(q.Name, "", "notifications", false, nil); err != nil {
            return err
        }
    }
    return nil
}

// Topic exchange — routing dengan wildcard
func setupTopicExchange(ch *amqp.Channel) error {
    err := ch.ExchangeDeclare("events", "topic", true, false, false, false, nil)
    if err != nil {
        return err
    }

    queues := []struct {
        name       string
        routingKey string
    }{
        {"events.orders.all", "order.#"},          // semua event order
        {"events.payments", "payment.*"},           // semua event payment
        {"events.critical", "#.failed"},            // semua event failed
        {"events.audit", "#"},                      // semua event (audit log)
    }

    for _, q := range queues {
        queue, err := ch.QueueDeclare(q.name, true, false, false, false, nil)
        if err != nil {
            return err
        }
        if err := ch.QueueBind(queue.Name, q.routingKey, "events", false, nil); err != nil {
            return err
        }
    }
    return nil
}

Publish — Mengirim Pesan #

func publish(ch *amqp.Channel, exchange, routingKey string, body interface{}) error {
    data, err := json.Marshal(body)
    if err != nil {
        return err
    }

    return ch.PublishWithContext(
        context.Background(),
        exchange,   // exchange
        routingKey, // routing key
        true,       // mandatory: return jika tidak ada queue yang cocok
        false,      // immediate
        amqp.Publishing{
            ContentType:  "application/json",
            DeliveryMode: amqp.Persistent, // tahan restart broker
            MessageId:    uuid.New().String(),
            Timestamp:    time.Now(),
            Body:         data,
            Headers: amqp.Table{
                "source":  "order-service",
                "version": "1.0",
            },
        },
    )
}

// Publish dengan konfirmasi (Publisher Confirm)
func publishWithConfirm(ch *amqp.Channel, exchange, routingKey string, body interface{}) error {
    // Aktifkan confirm mode
    if err := ch.Confirm(false); err != nil {
        return err
    }
    confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))

    data, _ := json.Marshal(body)
    if err := ch.PublishWithContext(context.Background(), exchange, routingKey, false, false,
        amqp.Publishing{
            ContentType:  "application/json",
            DeliveryMode: amqp.Persistent,
            Body:         data,
        }); err != nil {
        return err
    }

    // Tunggu konfirmasi dari broker
    confirm := <-confirms
    if !confirm.Ack {
        return fmt.Errorf("pesan tidak dikonfirmasi oleh broker")
    }
    return nil
}

Consume — Membaca Pesan #

// Prefetch — batasi jumlah pesan yang dikirim sebelum acknowledgment
func startConsumer(ch *amqp.Channel, queueName string, handler func(amqp.Delivery) error) error {
    // Hanya kirim 5 pesan sekaligus; tunggu ACK sebelum kirim lebih
    if err := ch.Qos(5, 0, false); err != nil {
        return fmt.Errorf("qos: %w", err)
    }

    msgs, err := ch.Consume(
        queueName, // queue
        "",        // consumer tag (kosong = auto-generate)
        false,     // auto-ack (false = manual ack)
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,
    )
    if err != nil {
        return fmt.Errorf("consume: %w", err)
    }

    go func() {
        for msg := range msgs {
            if err := handler(msg); err != nil {
                log.Printf("Error proses pesan: %v", err)
                // Nack dengan requeue=false → masuk Dead Letter Queue
                msg.Nack(false, false)
                continue
            }
            // Ack — hapus pesan dari queue
            msg.Ack(false)
        }
    }()

    return nil
}

Dead Letter Queue (DLQ) #

Pesan yang gagal diproses (Nack tanpa requeue) masuk ke DLQ untuk diinspeksi atau di-retry:

func setupDLQ(ch *amqp.Channel) error {
    // Exchange untuk DLQ
    err := ch.ExchangeDeclare("orders.dlx", "direct", true, false, false, false, nil)
    if err != nil {
        return err
    }

    // Queue untuk pesan gagal
    _, err = ch.QueueDeclare("order.failed", true, false, false, false, nil)
    if err != nil {
        return err
    }

    return ch.QueueBind("order.failed", "order.failed", "orders.dlx", false, nil)
}

// Consumer untuk DLQ — analisis dan retry manual
func consumeDLQ(ch *amqp.Channel) error {
    return startConsumer(ch, "order.failed", func(msg amqp.Delivery) error {
        log.Printf("DLQ: MessageID=%s", msg.MessageId)
        log.Printf("  Headers: %v", msg.Headers)
        log.Printf("  Body: %s", string(msg.Body))

        // Analisis kenapa gagal dan putuskan: retry atau buang
        retryCount, _ := msg.Headers["x-retry-count"].(int32)
        if retryCount < 3 {
            // Republish dengan retry count
            msg.Headers["x-retry-count"] = retryCount + 1
            return ch.PublishWithContext(context.Background(),
                "orders", "order.processing", false, false,
                amqp.Publishing{
                    ContentType:  msg.ContentType,
                    DeliveryMode: amqp.Persistent,
                    Headers:      msg.Headers,
                    Body:         msg.Body,
                })
        }

        // Sudah 3x retry, simpan ke permanent storage untuk review manual
        log.Printf("PERMANENT FAILURE: %s", msg.MessageId)
        return nil
    })
}

Contoh Program Lengkap — Sistem Notifikasi #

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

type Notification struct {
    ID        string    `json:"id"`
    Type      string    `json:"type"`    // email, sms, push
    UserID    string    `json:"user_id"`
    Subject   string    `json:"subject,omitempty"`
    Message   string    `json:"message"`
    Channel   string    `json:"channel"` // email, sms, push
    Timestamp time.Time `json:"timestamp"`
}

func setupRabbitMQ(ch *amqp.Channel) error {
    // Fanout exchange untuk broadcast notifikasi
    if err := ch.ExchangeDeclare(
        "notifications", "fanout", true, false, false, false, nil); err != nil {
        return err
    }

    // Queue untuk setiap channel notifikasi
    channels := []string{"email", "sms", "push"}
    for _, name := range channels {
        q, err := ch.QueueDeclare(
            "notification."+name, true, false, false, false,
            amqp.Table{"x-message-ttl": int32(86400000)}, // TTL 24 jam
        )
        if err != nil {
            return err
        }
        if err := ch.QueueBind(q.Name, "", "notifications", false, nil); err != nil {
            return err
        }
    }
    return nil
}

func publishNotification(ch *amqp.Channel, notif Notification) error {
    data, _ := json.Marshal(notif)
    return ch.PublishWithContext(
        context.Background(),
        "notifications", "", false, false,
        amqp.Publishing{
            ContentType:  "application/json",
            DeliveryMode: amqp.Persistent,
            MessageId:    notif.ID,
            Timestamp:    notif.Timestamp,
            Body:         data,
        },
    )
}

func startNotificationWorker(ch *amqp.Channel, channelName string) error {
    ch.Qos(3, 0, false)

    msgs, err := ch.Consume(
        "notification."+channelName, "", false, false, false, false, nil)
    if err != nil {
        return err
    }

    go func() {
        log.Printf("[%s] Worker siap", channelName)
        for msg := range msgs {
            var notif Notification
            if err := json.Unmarshal(msg.Body, &notif); err != nil {
                log.Printf("[%s] ERROR unmarshal: %v", channelName, err)
                msg.Nack(false, false)
                continue
            }

            // Simulasi pengiriman berdasarkan channel
            switch channelName {
            case "email":
                fmt.Printf("  📧 [EMAIL] ke %s: %s\n", notif.UserID, notif.Subject)
                time.Sleep(50 * time.Millisecond)
            case "sms":
                fmt.Printf("  📱 [SMS] ke %s: %s\n", notif.UserID, notif.Message[:min(30, len(notif.Message))]+"...")
                time.Sleep(30 * time.Millisecond)
            case "push":
                fmt.Printf("  🔔 [PUSH] ke %s: %s\n", notif.UserID, notif.Message)
                time.Sleep(10 * time.Millisecond)
            }

            msg.Ack(false)
        }
    }()
    return nil
}

func min(a, b int) int {
    if a < b {
        return a
    }
    return b
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal("Koneksi RabbitMQ:", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal("Channel:", err)
    }
    defer ch.Close()

    if err := setupRabbitMQ(ch); err != nil {
        log.Fatal("Setup:", err)
    }
    fmt.Println("✓ RabbitMQ siap")

    // Start workers untuk setiap channel
    for _, name := range []string{"email", "sms", "push"} {
        if err := startNotificationWorker(ch, name); err != nil {
            log.Fatal("Worker:", err)
        }
    }

    // Publish beberapa notifikasi
    fmt.Println("\n=== Mengirim Notifikasi ===")
    notifications := []Notification{
        {
            ID: "notif-001", UserID: "USR-42",
            Subject: "Pesanan Dikonfirmasi",
            Message: "Pesanan ORD-001 senilai Rp1.500.000 telah dikonfirmasi.",
            Timestamp: time.Now(),
        },
        {
            ID: "notif-002", UserID: "USR-99",
            Subject: "Pembayaran Berhasil",
            Message: "Pembayaran untuk ORD-002 berhasil diterima.",
            Timestamp: time.Now(),
        },
        {
            ID: "notif-003", UserID: "USR-42",
            Subject: "Pesanan Sedang Dikirim",
            Message: "Pesanan ORD-001 sedang dalam perjalanan. No. resi: JNE123456.",
            Timestamp: time.Now(),
        },
    }

    for _, n := range notifications {
        if err := publishNotification(ch, n); err != nil {
            log.Printf("Gagal publish: %v", err)
        } else {
            fmt.Printf("  Dikirim: %s (to: %s)\n", n.ID, n.UserID)
        }
    }

    // Tunggu consumer memproses
    fmt.Println("\n=== Memproses Notifikasi ===")
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-sigCh:
        fmt.Println("\nShutdown...")
    case <-time.After(3 * time.Second):
        fmt.Println("\nSelesai.")
    }
}

Ringkasan #

  • amqp091-go adalah library resmi untuk RabbitMQ di Go — fork dari streadway/amqp yang tidak lagi dikelola.
  • Exchange types: direct (routing key tepat), fanout (broadcast), topic (wildcard * dan #), headers.
  • DeliveryMode: amqp.Persistent wajib untuk pesan yang tidak boleh hilang saat broker restart.
  • Manual ACK (autoAck=false) dengan msg.Ack() setelah berhasil proses, msg.Nack(false, false) untuk DLQ.
  • ch.Qos(n, 0, false) untuk prefetch — batasi berapa pesan dikirim sebelum worker memberi ACK.
  • Dead Letter Queue untuk pesan yang gagal — Nack tanpa requeue → masuk DLQ untuk analisis/retry.
  • Publisher Confirm untuk jaminan pesan sampai ke broker — aktifkan dengan ch.Confirm(false).
  • Reconnection logic penting untuk production — pantau conn.NotifyClose() dan reconnect otomatis.
  • x-message-ttl untuk expiry pesan yang tidak diproses; x-dead-letter-exchange untuk routing pesan expired/gagal.
  • Topic exchange dengan # (nol atau lebih kata) dan * (tepat satu kata) untuk routing yang sangat fleksibel.

← Sebelumnya: Kafka   Berikutnya: Amazon SQS →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact