RabbitMQ #
RabbitMQ adalah message broker tradisional yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol). Berbeda dari Kafka yang menyimpan log secara permanen, RabbitMQ menghapus pesan setelah dikonsumsi — lebih cocok untuk task queue, job distribution, dan RPC. Kelebihannya: routing yang sangat fleksibel melalui sistem Exchange, dead letter queue bawaan, dan kemudahan setup. Go menggunakan library github.com/rabbitmq/amqp091-go.
Konsep Dasar RabbitMQ #
Producer → mengirim pesan ke Exchange
Exchange → menerima pesan dan routing ke Queue berdasarkan aturan
Binding → aturan yang menghubungkan Exchange ke Queue
Queue → antrian tempat pesan menunggu dikonsumsi
Consumer → membaca pesan dari Queue
Exchange Types:
direct → routing berdasarkan routing key yang persis sama
fanout → broadcast ke semua queue yang terikat
topic → routing key dengan wildcard (* dan #)
headers → routing berdasarkan message headers
Instalasi #
go get github.com/rabbitmq/amqp091-go
Koneksi dan Channel #
import amqp "github.com/rabbitmq/amqp091-go"
func connect(url string) (*amqp.Connection, *amqp.Channel, error) {
// Format: amqp://user:password@host:port/vhost
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, fmt.Errorf("dial: %w", err)
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("channel: %w", err)
}
return conn, ch, nil
}
Koneksi Tahan Putus (Reconnection) #
type RabbitMQ struct {
url string
conn *amqp.Connection
ch *amqp.Channel
mu sync.Mutex
closed bool
}
func NewRabbitMQ(url string) (*RabbitMQ, error) {
r := &RabbitMQ{url: url}
if err := r.connect(); err != nil {
return nil, err
}
go r.watchReconnect()
return r, nil
}
func (r *RabbitMQ) connect() error {
conn, err := amqp.Dial(r.url)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return err
}
r.conn = conn
r.ch = ch
return nil
}
func (r *RabbitMQ) watchReconnect() {
for !r.closed {
reason, ok := <-r.conn.NotifyClose(make(chan *amqp.Error))
if !ok || r.closed {
break
}
log.Printf("Koneksi terputus: %v, mencoba reconnect...", reason)
for {
time.Sleep(5 * time.Second)
if err := r.connect(); err != nil {
log.Printf("Reconnect gagal: %v, coba lagi...", err)
continue
}
log.Println("Reconnect berhasil!")
break
}
}
}
func (r *RabbitMQ) Close() {
r.closed = true
r.ch.Close()
r.conn.Close()
}
Exchange dan Queue Setup #
func setupDirectExchange(ch *amqp.Channel) error {
// Deklarasi exchange
err := ch.ExchangeDeclare(
"orders", // nama
"direct", // type: direct, fanout, topic, headers
true, // durable: tetap ada setelah restart
false, // auto-deleted
false, // internal
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("exchange declare: %w", err)
}
// Deklarasi queue
q, err := ch.QueueDeclare(
"order.processing", // nama (kosong = generate random)
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-dead-letter-exchange": "orders.dlx", // dead letter exchange
"x-dead-letter-routing-key": "order.failed", // routing key untuk DLQ
"x-message-ttl": int32(3600000), // TTL 1 jam (ms)
},
)
if err != nil {
return fmt.Errorf("queue declare: %w", err)
}
// Bind queue ke exchange dengan routing key
return ch.QueueBind(
q.Name, // queue
"order.processing", // routing key
"orders", // exchange
false,
nil,
)
}
// Fanout exchange — broadcast ke semua queue
func setupFanoutExchange(ch *amqp.Channel) error {
err := ch.ExchangeDeclare("notifications", "fanout", true, false, false, false, nil)
if err != nil {
return err
}
// Setiap service punya queue-nya sendiri
for _, service := range []string{"email", "sms", "push"} {
q, err := ch.QueueDeclare(
"notification."+service, true, false, false, false, nil)
if err != nil {
return err
}
// Fanout tidak butuh routing key
if err := ch.QueueBind(q.Name, "", "notifications", false, nil); err != nil {
return err
}
}
return nil
}
// Topic exchange — routing dengan wildcard
func setupTopicExchange(ch *amqp.Channel) error {
err := ch.ExchangeDeclare("events", "topic", true, false, false, false, nil)
if err != nil {
return err
}
queues := []struct {
name string
routingKey string
}{
{"events.orders.all", "order.#"}, // semua event order
{"events.payments", "payment.*"}, // semua event payment
{"events.critical", "#.failed"}, // semua event failed
{"events.audit", "#"}, // semua event (audit log)
}
for _, q := range queues {
queue, err := ch.QueueDeclare(q.name, true, false, false, false, nil)
if err != nil {
return err
}
if err := ch.QueueBind(queue.Name, q.routingKey, "events", false, nil); err != nil {
return err
}
}
return nil
}
Publish — Mengirim Pesan #
func publish(ch *amqp.Channel, exchange, routingKey string, body interface{}) error {
data, err := json.Marshal(body)
if err != nil {
return err
}
return ch.PublishWithContext(
context.Background(),
exchange, // exchange
routingKey, // routing key
true, // mandatory: return jika tidak ada queue yang cocok
false, // immediate
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent, // tahan restart broker
MessageId: uuid.New().String(),
Timestamp: time.Now(),
Body: data,
Headers: amqp.Table{
"source": "order-service",
"version": "1.0",
},
},
)
}
// Publish dengan konfirmasi (Publisher Confirm)
func publishWithConfirm(ch *amqp.Channel, exchange, routingKey string, body interface{}) error {
// Aktifkan confirm mode
if err := ch.Confirm(false); err != nil {
return err
}
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
data, _ := json.Marshal(body)
if err := ch.PublishWithContext(context.Background(), exchange, routingKey, false, false,
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
Body: data,
}); err != nil {
return err
}
// Tunggu konfirmasi dari broker
confirm := <-confirms
if !confirm.Ack {
return fmt.Errorf("pesan tidak dikonfirmasi oleh broker")
}
return nil
}
Consume — Membaca Pesan #
// Prefetch — batasi jumlah pesan yang dikirim sebelum acknowledgment
func startConsumer(ch *amqp.Channel, queueName string, handler func(amqp.Delivery) error) error {
// Hanya kirim 5 pesan sekaligus; tunggu ACK sebelum kirim lebih
if err := ch.Qos(5, 0, false); err != nil {
return fmt.Errorf("qos: %w", err)
}
msgs, err := ch.Consume(
queueName, // queue
"", // consumer tag (kosong = auto-generate)
false, // auto-ack (false = manual ack)
false, // exclusive
false, // no-local
false, // no-wait
nil,
)
if err != nil {
return fmt.Errorf("consume: %w", err)
}
go func() {
for msg := range msgs {
if err := handler(msg); err != nil {
log.Printf("Error proses pesan: %v", err)
// Nack dengan requeue=false → masuk Dead Letter Queue
msg.Nack(false, false)
continue
}
// Ack — hapus pesan dari queue
msg.Ack(false)
}
}()
return nil
}
Dead Letter Queue (DLQ) #
Pesan yang gagal diproses (Nack tanpa requeue) masuk ke DLQ untuk diinspeksi atau di-retry:
func setupDLQ(ch *amqp.Channel) error {
// Exchange untuk DLQ
err := ch.ExchangeDeclare("orders.dlx", "direct", true, false, false, false, nil)
if err != nil {
return err
}
// Queue untuk pesan gagal
_, err = ch.QueueDeclare("order.failed", true, false, false, false, nil)
if err != nil {
return err
}
return ch.QueueBind("order.failed", "order.failed", "orders.dlx", false, nil)
}
// Consumer untuk DLQ — analisis dan retry manual
func consumeDLQ(ch *amqp.Channel) error {
return startConsumer(ch, "order.failed", func(msg amqp.Delivery) error {
log.Printf("DLQ: MessageID=%s", msg.MessageId)
log.Printf(" Headers: %v", msg.Headers)
log.Printf(" Body: %s", string(msg.Body))
// Analisis kenapa gagal dan putuskan: retry atau buang
retryCount, _ := msg.Headers["x-retry-count"].(int32)
if retryCount < 3 {
// Republish dengan retry count
msg.Headers["x-retry-count"] = retryCount + 1
return ch.PublishWithContext(context.Background(),
"orders", "order.processing", false, false,
amqp.Publishing{
ContentType: msg.ContentType,
DeliveryMode: amqp.Persistent,
Headers: msg.Headers,
Body: msg.Body,
})
}
// Sudah 3x retry, simpan ke permanent storage untuk review manual
log.Printf("PERMANENT FAILURE: %s", msg.MessageId)
return nil
})
}
Contoh Program Lengkap — Sistem Notifikasi #
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type Notification struct {
ID string `json:"id"`
Type string `json:"type"` // email, sms, push
UserID string `json:"user_id"`
Subject string `json:"subject,omitempty"`
Message string `json:"message"`
Channel string `json:"channel"` // email, sms, push
Timestamp time.Time `json:"timestamp"`
}
func setupRabbitMQ(ch *amqp.Channel) error {
// Fanout exchange untuk broadcast notifikasi
if err := ch.ExchangeDeclare(
"notifications", "fanout", true, false, false, false, nil); err != nil {
return err
}
// Queue untuk setiap channel notifikasi
channels := []string{"email", "sms", "push"}
for _, name := range channels {
q, err := ch.QueueDeclare(
"notification."+name, true, false, false, false,
amqp.Table{"x-message-ttl": int32(86400000)}, // TTL 24 jam
)
if err != nil {
return err
}
if err := ch.QueueBind(q.Name, "", "notifications", false, nil); err != nil {
return err
}
}
return nil
}
func publishNotification(ch *amqp.Channel, notif Notification) error {
data, _ := json.Marshal(notif)
return ch.PublishWithContext(
context.Background(),
"notifications", "", false, false,
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
MessageId: notif.ID,
Timestamp: notif.Timestamp,
Body: data,
},
)
}
func startNotificationWorker(ch *amqp.Channel, channelName string) error {
ch.Qos(3, 0, false)
msgs, err := ch.Consume(
"notification."+channelName, "", false, false, false, false, nil)
if err != nil {
return err
}
go func() {
log.Printf("[%s] Worker siap", channelName)
for msg := range msgs {
var notif Notification
if err := json.Unmarshal(msg.Body, ¬if); err != nil {
log.Printf("[%s] ERROR unmarshal: %v", channelName, err)
msg.Nack(false, false)
continue
}
// Simulasi pengiriman berdasarkan channel
switch channelName {
case "email":
fmt.Printf(" 📧 [EMAIL] ke %s: %s\n", notif.UserID, notif.Subject)
time.Sleep(50 * time.Millisecond)
case "sms":
fmt.Printf(" 📱 [SMS] ke %s: %s\n", notif.UserID, notif.Message[:min(30, len(notif.Message))]+"...")
time.Sleep(30 * time.Millisecond)
case "push":
fmt.Printf(" 🔔 [PUSH] ke %s: %s\n", notif.UserID, notif.Message)
time.Sleep(10 * time.Millisecond)
}
msg.Ack(false)
}
}()
return nil
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal("Koneksi RabbitMQ:", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatal("Channel:", err)
}
defer ch.Close()
if err := setupRabbitMQ(ch); err != nil {
log.Fatal("Setup:", err)
}
fmt.Println("✓ RabbitMQ siap")
// Start workers untuk setiap channel
for _, name := range []string{"email", "sms", "push"} {
if err := startNotificationWorker(ch, name); err != nil {
log.Fatal("Worker:", err)
}
}
// Publish beberapa notifikasi
fmt.Println("\n=== Mengirim Notifikasi ===")
notifications := []Notification{
{
ID: "notif-001", UserID: "USR-42",
Subject: "Pesanan Dikonfirmasi",
Message: "Pesanan ORD-001 senilai Rp1.500.000 telah dikonfirmasi.",
Timestamp: time.Now(),
},
{
ID: "notif-002", UserID: "USR-99",
Subject: "Pembayaran Berhasil",
Message: "Pembayaran untuk ORD-002 berhasil diterima.",
Timestamp: time.Now(),
},
{
ID: "notif-003", UserID: "USR-42",
Subject: "Pesanan Sedang Dikirim",
Message: "Pesanan ORD-001 sedang dalam perjalanan. No. resi: JNE123456.",
Timestamp: time.Now(),
},
}
for _, n := range notifications {
if err := publishNotification(ch, n); err != nil {
log.Printf("Gagal publish: %v", err)
} else {
fmt.Printf(" Dikirim: %s (to: %s)\n", n.ID, n.UserID)
}
}
// Tunggu consumer memproses
fmt.Println("\n=== Memproses Notifikasi ===")
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
fmt.Println("\nShutdown...")
case <-time.After(3 * time.Second):
fmt.Println("\nSelesai.")
}
}
Ringkasan #
amqp091-goadalah library resmi untuk RabbitMQ di Go — fork daristreadway/amqpyang tidak lagi dikelola.- Exchange types:
direct(routing key tepat),fanout(broadcast),topic(wildcard*dan#),headers.DeliveryMode: amqp.Persistentwajib untuk pesan yang tidak boleh hilang saat broker restart.- Manual ACK (
autoAck=false) denganmsg.Ack()setelah berhasil proses,msg.Nack(false, false)untuk DLQ.ch.Qos(n, 0, false)untuk prefetch — batasi berapa pesan dikirim sebelum worker memberi ACK.- Dead Letter Queue untuk pesan yang gagal — Nack tanpa requeue → masuk DLQ untuk analisis/retry.
- Publisher Confirm untuk jaminan pesan sampai ke broker — aktifkan dengan
ch.Confirm(false).- Reconnection logic penting untuk production — pantau
conn.NotifyClose()dan reconnect otomatis.x-message-ttluntuk expiry pesan yang tidak diproses;x-dead-letter-exchangeuntuk routing pesan expired/gagal.- Topic exchange dengan
#(nol atau lebih kata) dan*(tepat satu kata) untuk routing yang sangat fleksibel.