Google Pub/Sub #

Google Cloud Pub/Sub adalah layanan messaging asinkron dan streaming data yang terkelola sepenuhnya di Google Cloud Platform. Ia menawarkan jaminan at-least-once delivery, ordering guarantees (dengan ordering key), dan integrasi native dengan ekosistem GCP seperti Cloud Functions, Dataflow, dan BigQuery. Berbeda dari Kafka (self-managed) dan RabbitMQ (protokol AMQP), Pub/Sub adalah fully-managed service — tidak ada server yang dikonfigurasi.

Konsep Dasar Pub/Sub #

Topic       → saluran untuk mempublikasikan pesan
Subscription → langganan ke topic; setiap subscription punya copy pesan sendiri
Publisher   → mengirim pesan ke topic
Subscriber  → menerima pesan dari subscription (pull atau push)
Message ID  → ID unik yang ditetapkan GCP untuk setiap pesan
Ack ID      → ID untuk mengakui pesan sudah diproses

Instalasi #

go get cloud.google.com/go/pubsub

Setup Client #

import "cloud.google.com/go/pubsub"

func newPubSubClient(ctx context.Context, projectID string) (*pubsub.Client, error) {
    // Autentikasi via Application Default Credentials (ADC):
    // - GOOGLE_APPLICATION_CREDENTIALS env var
    // - gcloud auth application-default login
    // - Service account di GCE/GKE/Cloud Run
    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        return nil, fmt.Errorf("buat client: %w", err)
    }
    return client, nil
}

// Untuk emulator lokal (development)
func newEmulatorClient(ctx context.Context) (*pubsub.Client, error) {
    // Set env: PUBSUB_EMULATOR_HOST=localhost:8085
    return pubsub.NewClient(ctx, "local-project")
}

Membuat Topic dan Subscription #

func createTopic(ctx context.Context, client *pubsub.Client, topicID string) (*pubsub.Topic, error) {
    // Buat topic jika belum ada
    topic, err := client.CreateTopic(ctx, topicID)
    if err != nil {
        // Jika sudah ada, ambil topic yang existing
        if status.Code(err) == codes.AlreadyExists {
            return client.Topic(topicID), nil
        }
        return nil, fmt.Errorf("create topic: %w", err)
    }

    // Konfigurasi topic
    cfg := pubsub.TopicConfig{
        MessageStoragePolicy: pubsub.MessageStoragePolicy{
            AllowedPersistenceRegions: []string{"asia-southeast1"},
        },
        // Schema validation (opsional)
        // SchemaSettings: &pubsub.SchemaSettings{...},
    }
    if _, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
        MessageStoragePolicy: cfg.MessageStoragePolicy,
    }); err != nil {
        log.Printf("Update topic config: %v", err)
    }

    return topic, nil
}

func createSubscription(ctx context.Context, client *pubsub.Client,
    subID, topicID string) (*pubsub.Subscription, error) {

    topic := client.Topic(topicID)

    sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
        Topic:            topic,
        AckDeadline:      30 * time.Second,  // batas waktu proses sebelum redeliver
        RetainAckedMessages: false,

        // Dead letter topic untuk pesan yang gagal berulang kali
        DeadLetterPolicy: &pubsub.DeadLetterPolicy{
            DeadLetterTopic:     "projects/myproject/topics/dead-letter",
            MaxDeliveryAttempts: 5,
        },

        // Retry policy
        RetryPolicy: &pubsub.RetryPolicy{
            MinimumBackoff: 10 * time.Second,
            MaximumBackoff: 600 * time.Second,
        },

        // Filter — hanya terima pesan dengan atribut tertentu
        Filter: `attributes.source = "order-service"`,

        // Ordering — aktifkan jika publisher menggunakan ordering key
        EnableMessageOrdering: true,
    })
    if err != nil {
        if status.Code(err) == codes.AlreadyExists {
            return client.Subscription(subID), nil
        }
        return nil, fmt.Errorf("create subscription: %w", err)
    }
    return sub, nil
}

Publish — Mengirim Pesan #

func publish(ctx context.Context, topic *pubsub.Topic, data interface{}, attrs map[string]string) error {
    body, err := json.Marshal(data)
    if err != nil {
        return err
    }

    msg := &pubsub.Message{
        Data:       body,
        Attributes: attrs,
        // OrderingKey — pesan dengan key yang sama dikirim secara berurutan
        // (subscription harus EnableMessageOrdering=true)
        // OrderingKey: "order-" + orderID,
    }

    result := topic.Publish(ctx, msg)

    // Tunggu konfirmasi dari server (blocking)
    msgID, err := result.Get(ctx)
    if err != nil {
        return fmt.Errorf("publish: %w", err)
    }

    log.Printf("Pesan dikirim, ID: %s", msgID)
    return nil
}

// Publish dengan ordering key
func publishOrdered(ctx context.Context, topic *pubsub.Topic,
    orderingKey string, data interface{}) error {

    body, _ := json.Marshal(data)
    result := topic.Publish(ctx, &pubsub.Message{
        Data:        body,
        OrderingKey: orderingKey,
    })

    _, err := result.Get(ctx)
    return err
}

// Batch publish — kirim banyak pesan, Pub/Sub auto-batch
func publishBatch(ctx context.Context, topic *pubsub.Topic, messages []interface{}) error {
    // Konfigurasikan batch settings
    topic.PublishSettings = pubsub.PublishSettings{
        ByteThreshold:  1e6,             // flush saat batch mencapai 1MB
        CountThreshold: 100,             // flush saat 100 pesan
        DelayThreshold: 10 * time.Millisecond, // flush setiap 10ms
    }

    results := make([]*pubsub.PublishResult, len(messages))
    for i, msg := range messages {
        body, _ := json.Marshal(msg)
        results[i] = topic.Publish(ctx, &pubsub.Message{Data: body})
    }

    // Tunggu semua publish selesai
    var errors []error
    for i, r := range results {
        if _, err := r.Get(ctx); err != nil {
            errors = append(errors, fmt.Errorf("pesan %d: %w", i, err))
        }
    }

    if len(errors) > 0 {
        return fmt.Errorf("%d pesan gagal dikirim", len(errors))
    }
    return nil
}

Receive — Menerima Pesan (Pull) #

func receive(ctx context.Context, sub *pubsub.Subscription,
    handler func(ctx context.Context, msg *pubsub.Message) error) error {

    // Konfigurasi receive settings
    sub.ReceiveSettings = pubsub.ReceiveSettings{
        MaxExtension:           60 * time.Minute, // max waktu ekstensi ack deadline
        MaxExtensionPeriod:     10 * time.Minute,
        MaxOutstandingMessages: 100,              // max pesan yang diproses bersamaan
        MaxOutstandingBytes:    1e9,              // 1GB max data outstanding
        NumGoroutines:          5,                // goroutine paralel untuk receive
    }

    return sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        log.Printf("Pesan diterima: ID=%s, PublishTime=%s",
            msg.ID, msg.PublishTime.Format(time.RFC3339))
        log.Printf("  Attributes: %v", msg.Attributes)
        log.Printf("  OrderingKey: %s", msg.OrderingKey)

        if err := handler(ctx, msg); err != nil {
            log.Printf("Error proses pesan: %v", err)
            // Nack — Pub/Sub akan redeliver sesuai retry policy
            msg.Nack()
            return
        }

        // Ack — tandai pesan sudah berhasil diproses
        msg.Ack()
    })
}

// Receive dengan timeout
func receiveWithTimeout(ctx context.Context, sub *pubsub.Subscription,
    timeout time.Duration, handler func(*pubsub.Message) error) error {

    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    return sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        if err := handler(msg); err != nil {
            msg.Nack()
            return
        }
        msg.Ack()
    })
}

Push Subscription — Server Menerima via HTTP #

// Untuk push subscription, GCP mengirim HTTP POST ke endpoint kamu
// Format body: {"message": {"data": "base64...", "attributes": {...}}, "subscription": "..."}

type PushMessage struct {
    Message struct {
        Data       string            `json:"data"`
        Attributes map[string]string `json:"attributes"`
        MessageID  string            `json:"messageId"`
        PublishTime string           `json:"publishTime"`
    } `json:"message"`
    Subscription string `json:"subscription"`
}

func pubsubPushHandler(w http.ResponseWriter, r *http.Request) {
    var pushMsg PushMessage
    if err := json.NewDecoder(r.Body).Decode(&pushMsg); err != nil {
        http.Error(w, "bad request", http.StatusBadRequest)
        return
    }

    // Decode base64 data
    data, err := base64.StdEncoding.DecodeString(pushMsg.Message.Data)
    if err != nil {
        http.Error(w, "bad data", http.StatusBadRequest)
        return
    }

    log.Printf("Push message: ID=%s, data=%s",
        pushMsg.Message.MessageID, string(data))

    // Proses pesan
    if err := processEvent(data, pushMsg.Message.Attributes); err != nil {
        // Return non-200 → Pub/Sub akan retry
        http.Error(w, "processing failed", http.StatusInternalServerError)
        return
    }

    // Return 200 → Pub/Sub anggap pesan sudah di-ack
    w.WriteHeader(http.StatusNoContent)
}

Contoh Program Lengkap — Event Streaming Pipeline #

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os/signal"
    "sync"
    "syscall"
    "time"
    "os"

    "cloud.google.com/go/pubsub"
)

const (
    projectID    = "my-gcp-project"
    topicOrders  = "orders"
    subOrders    = "orders-processor"
)

type OrderEvent struct {
    EventID    string    `json:"event_id"`
    Type       string    `json:"type"`
    OrderID    string    `json:"order_id"`
    CustomerID string    `json:"customer_id"`
    Amount     float64   `json:"amount"`
    Items      []string  `json:"items"`
    Timestamp  time.Time `json:"timestamp"`
}

// EventPublisher mengirim event ke Pub/Sub
type EventPublisher struct {
    topic *pubsub.Topic
}

func NewEventPublisher(ctx context.Context, client *pubsub.Client, topicID string) (*EventPublisher, error) {
    topic := client.Topic(topicID)
    ok, err := topic.Exists(ctx)
    if err != nil {
        return nil, err
    }
    if !ok {
        topic, err = client.CreateTopic(ctx, topicID)
        if err != nil {
            return nil, err
        }
    }

    topic.PublishSettings = pubsub.PublishSettings{
        CountThreshold: 100,
        DelayThreshold: 10 * time.Millisecond,
    }

    return &EventPublisher{topic: topic}, nil
}

func (p *EventPublisher) Publish(ctx context.Context, event OrderEvent) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }

    result := p.topic.Publish(ctx, &pubsub.Message{
        Data: data,
        Attributes: map[string]string{
            "event_type": event.Type,
            "source":     "order-service",
            "version":    "1.0",
        },
        // Gunakan OrderID sebagai ordering key — event untuk order yang sama selalu berurutan
        OrderingKey: event.OrderID,
    })

    msgID, err := result.Get(ctx)
    if err != nil {
        return fmt.Errorf("publish: %w", err)
    }
    log.Printf("[PUB] event=%s order=%s msgID=%s", event.Type, event.OrderID, msgID)
    return nil
}

func (p *EventPublisher) Close() {
    p.topic.Stop()
}

// EventProcessor memproses event dari Pub/Sub
type EventProcessor struct {
    sub     *pubsub.Subscription
    stats   map[string]int
    statsMu sync.Mutex
}

func NewEventProcessor(ctx context.Context, client *pubsub.Client, subID, topicID string) (*EventProcessor, error) {
    topic := client.Topic(topicID)

    sub := client.Subscription(subID)
    ok, err := sub.Exists(ctx)
    if err != nil {
        return nil, err
    }
    if !ok {
        sub, err = client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
            Topic:                 topic,
            AckDeadline:           30 * time.Second,
            EnableMessageOrdering: true,
        })
        if err != nil {
            return nil, err
        }
    }

    sub.ReceiveSettings = pubsub.ReceiveSettings{
        MaxOutstandingMessages: 50,
        NumGoroutines:          3,
    }

    return &EventProcessor{sub: sub, stats: make(map[string]int)}, nil
}

func (p *EventProcessor) Start(ctx context.Context) error {
    log.Println("[SUB] Mulai memproses event...")

    return p.sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        var event OrderEvent
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            log.Printf("[SUB] ERROR unmarshal: %v", err)
            msg.Nack()
            return
        }

        log.Printf("[SUB] ID=%s type=%s order=%s amount=Rp%.0f",
            msg.ID[:8]+"...", event.Type, event.OrderID, event.Amount)

        if err := p.process(event); err != nil {
            log.Printf("[SUB] GAGAL proses %s: %v", event.OrderID, err)
            msg.Nack()
            return
        }

        p.statsMu.Lock()
        p.stats[event.Type]++
        p.statsMu.Unlock()

        msg.Ack()
    })
}

func (p *EventProcessor) process(event OrderEvent) error {
    switch event.Type {
    case "order.created":
        fmt.Printf("  → Buat record order %s (Rp%.0f)\n", event.OrderID, event.Amount)
        time.Sleep(50 * time.Millisecond)
    case "order.paid":
        fmt.Printf("  → Tandai paid dan kirim notifikasi: %s\n", event.OrderID)
        time.Sleep(30 * time.Millisecond)
    case "order.shipped":
        fmt.Printf("  → Update tracking: %s\n", event.OrderID)
        time.Sleep(20 * time.Millisecond)
    case "order.completed":
        fmt.Printf("  → Selesaikan order dan update stats: %s\n", event.OrderID)
        time.Sleep(40 * time.Millisecond)
    }
    return nil
}

func (p *EventProcessor) PrintStats() {
    p.statsMu.Lock()
    defer p.statsMu.Unlock()
    fmt.Println("\n=== Statistik Pemrosesan ===")
    total := 0
    for t, c := range p.stats {
        fmt.Printf("  %-20s: %d event\n", t, c)
        total += c
    }
    fmt.Printf("  %-20s: %d event\n", "TOTAL", total)
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigCh
        log.Println("Menerima sinyal, shutdown...")
        cancel()
    }()

    // Setup client
    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        log.Fatal("Client:", err)
    }
    defer client.Close()

    // Publisher
    publisher, err := NewEventPublisher(ctx, client, topicOrders)
    if err != nil {
        log.Fatal("Publisher:", err)
    }
    defer publisher.Close()

    // Processor
    processor, err := NewEventProcessor(ctx, client, subOrders, topicOrders)
    if err != nil {
        log.Fatal("Processor:", err)
    }

    // Start processor di goroutine
    go func() {
        if err := processor.Start(ctx); err != nil && ctx.Err() == nil {
            log.Printf("Processor error: %v", err)
        }
    }()

    // Publish events
    fmt.Println("=== Mengirim Order Events ===")
    orders := []struct {
        id     string
        events []string
        amount float64
        items  []string
    }{
        {"ORD-001", []string{"order.created", "order.paid", "order.shipped", "order.completed"},
            1_500_000, []string{"Laptop", "Mouse"}},
        {"ORD-002", []string{"order.created", "order.paid"},
            350_000, []string{"Keyboard"}},
        {"ORD-003", []string{"order.created"},
            85_000, []string{"Kaos"}},
    }

    for _, order := range orders {
        for i, evtType := range order.events {
            event := OrderEvent{
                EventID:    fmt.Sprintf("evt-%s-%d", order.id, i),
                Type:       evtType,
                OrderID:    order.id,
                CustomerID: "CUST-42",
                Amount:     order.amount,
                Items:      order.items,
                Timestamp:  time.Now(),
            }
            if err := publisher.Publish(ctx, event); err != nil {
                log.Printf("Gagal publish: %v", err)
            }
            time.Sleep(100 * time.Millisecond)
        }
    }

    // Tunggu pemrosesan
    fmt.Println("\n=== Memproses Events ===")
    select {
    case <-ctx.Done():
    case <-time.After(5 * time.Second):
        cancel()
    }

    processor.PrintStats()
    fmt.Println("Selesai.")
}

Ringkasan #

  • Pub/Sub adalah fully-managed service di GCP — tidak perlu mengelola broker, replikasi, atau partisi.
  • Topic untuk publish; Subscription untuk receive — setiap subscription mendapat copy pesan sendiri.
  • topic.Publish() mengembalikan PublishResult — panggil .Get(ctx) untuk konfirmasi pengiriman.
  • PublishSettings mengontrol batching otomatis — sesuaikan CountThreshold dan DelayThreshold.
  • OrderingKey menjamin urutan pesan untuk key yang sama — aktifkan EnableMessageOrdering di subscription.
  • msg.Ack() untuk konfirmasi proses berhasil; msg.Nack() untuk redeliver sesuai retry policy.
  • ReceiveSettings.NumGoroutines untuk parallelism; MaxOutstandingMessages untuk backpressure.
  • Dead letter topic untuk pesan yang gagal setelah MaxDeliveryAttempts kali.
  • Filter di subscription (attributes.source = "order-service") untuk filtering tanpa kode tambahan.
  • Push subscription untuk serverless (Cloud Functions, Cloud Run) — GCP POST ke endpoint HTTP kamu.

← Sebelumnya: Amazon SQS   Berikutnya: Redis →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact