Google Pub/Sub #
Google Cloud Pub/Sub adalah layanan messaging asinkron dan streaming data yang terkelola sepenuhnya di Google Cloud Platform. Ia menawarkan jaminan at-least-once delivery, ordering guarantees (dengan ordering key), dan integrasi native dengan ekosistem GCP seperti Cloud Functions, Dataflow, dan BigQuery. Berbeda dari Kafka (self-managed) dan RabbitMQ (protokol AMQP), Pub/Sub adalah fully-managed service — tidak ada server yang dikonfigurasi.
Konsep Dasar Pub/Sub #
Topic → saluran untuk mempublikasikan pesan
Subscription → langganan ke topic; setiap subscription punya copy pesan sendiri
Publisher → mengirim pesan ke topic
Subscriber → menerima pesan dari subscription (pull atau push)
Message ID → ID unik yang ditetapkan GCP untuk setiap pesan
Ack ID → ID untuk mengakui pesan sudah diproses
Instalasi #
go get cloud.google.com/go/pubsub
Setup Client #
import "cloud.google.com/go/pubsub"
func newPubSubClient(ctx context.Context, projectID string) (*pubsub.Client, error) {
// Autentikasi via Application Default Credentials (ADC):
// - GOOGLE_APPLICATION_CREDENTIALS env var
// - gcloud auth application-default login
// - Service account di GCE/GKE/Cloud Run
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("buat client: %w", err)
}
return client, nil
}
// Untuk emulator lokal (development)
func newEmulatorClient(ctx context.Context) (*pubsub.Client, error) {
// Set env: PUBSUB_EMULATOR_HOST=localhost:8085
return pubsub.NewClient(ctx, "local-project")
}
Membuat Topic dan Subscription #
func createTopic(ctx context.Context, client *pubsub.Client, topicID string) (*pubsub.Topic, error) {
// Buat topic jika belum ada
topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
// Jika sudah ada, ambil topic yang existing
if status.Code(err) == codes.AlreadyExists {
return client.Topic(topicID), nil
}
return nil, fmt.Errorf("create topic: %w", err)
}
// Konfigurasi topic
cfg := pubsub.TopicConfig{
MessageStoragePolicy: pubsub.MessageStoragePolicy{
AllowedPersistenceRegions: []string{"asia-southeast1"},
},
// Schema validation (opsional)
// SchemaSettings: &pubsub.SchemaSettings{...},
}
if _, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
MessageStoragePolicy: cfg.MessageStoragePolicy,
}); err != nil {
log.Printf("Update topic config: %v", err)
}
return topic, nil
}
func createSubscription(ctx context.Context, client *pubsub.Client,
subID, topicID string) (*pubsub.Subscription, error) {
topic := client.Topic(topicID)
sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 30 * time.Second, // batas waktu proses sebelum redeliver
RetainAckedMessages: false,
// Dead letter topic untuk pesan yang gagal berulang kali
DeadLetterPolicy: &pubsub.DeadLetterPolicy{
DeadLetterTopic: "projects/myproject/topics/dead-letter",
MaxDeliveryAttempts: 5,
},
// Retry policy
RetryPolicy: &pubsub.RetryPolicy{
MinimumBackoff: 10 * time.Second,
MaximumBackoff: 600 * time.Second,
},
// Filter — hanya terima pesan dengan atribut tertentu
Filter: `attributes.source = "order-service"`,
// Ordering — aktifkan jika publisher menggunakan ordering key
EnableMessageOrdering: true,
})
if err != nil {
if status.Code(err) == codes.AlreadyExists {
return client.Subscription(subID), nil
}
return nil, fmt.Errorf("create subscription: %w", err)
}
return sub, nil
}
Publish — Mengirim Pesan #
func publish(ctx context.Context, topic *pubsub.Topic, data interface{}, attrs map[string]string) error {
body, err := json.Marshal(data)
if err != nil {
return err
}
msg := &pubsub.Message{
Data: body,
Attributes: attrs,
// OrderingKey — pesan dengan key yang sama dikirim secara berurutan
// (subscription harus EnableMessageOrdering=true)
// OrderingKey: "order-" + orderID,
}
result := topic.Publish(ctx, msg)
// Tunggu konfirmasi dari server (blocking)
msgID, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("publish: %w", err)
}
log.Printf("Pesan dikirim, ID: %s", msgID)
return nil
}
// Publish dengan ordering key
func publishOrdered(ctx context.Context, topic *pubsub.Topic,
orderingKey string, data interface{}) error {
body, _ := json.Marshal(data)
result := topic.Publish(ctx, &pubsub.Message{
Data: body,
OrderingKey: orderingKey,
})
_, err := result.Get(ctx)
return err
}
// Batch publish — kirim banyak pesan, Pub/Sub auto-batch
func publishBatch(ctx context.Context, topic *pubsub.Topic, messages []interface{}) error {
// Konfigurasikan batch settings
topic.PublishSettings = pubsub.PublishSettings{
ByteThreshold: 1e6, // flush saat batch mencapai 1MB
CountThreshold: 100, // flush saat 100 pesan
DelayThreshold: 10 * time.Millisecond, // flush setiap 10ms
}
results := make([]*pubsub.PublishResult, len(messages))
for i, msg := range messages {
body, _ := json.Marshal(msg)
results[i] = topic.Publish(ctx, &pubsub.Message{Data: body})
}
// Tunggu semua publish selesai
var errors []error
for i, r := range results {
if _, err := r.Get(ctx); err != nil {
errors = append(errors, fmt.Errorf("pesan %d: %w", i, err))
}
}
if len(errors) > 0 {
return fmt.Errorf("%d pesan gagal dikirim", len(errors))
}
return nil
}
Receive — Menerima Pesan (Pull) #
func receive(ctx context.Context, sub *pubsub.Subscription,
handler func(ctx context.Context, msg *pubsub.Message) error) error {
// Konfigurasi receive settings
sub.ReceiveSettings = pubsub.ReceiveSettings{
MaxExtension: 60 * time.Minute, // max waktu ekstensi ack deadline
MaxExtensionPeriod: 10 * time.Minute,
MaxOutstandingMessages: 100, // max pesan yang diproses bersamaan
MaxOutstandingBytes: 1e9, // 1GB max data outstanding
NumGoroutines: 5, // goroutine paralel untuk receive
}
return sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
log.Printf("Pesan diterima: ID=%s, PublishTime=%s",
msg.ID, msg.PublishTime.Format(time.RFC3339))
log.Printf(" Attributes: %v", msg.Attributes)
log.Printf(" OrderingKey: %s", msg.OrderingKey)
if err := handler(ctx, msg); err != nil {
log.Printf("Error proses pesan: %v", err)
// Nack — Pub/Sub akan redeliver sesuai retry policy
msg.Nack()
return
}
// Ack — tandai pesan sudah berhasil diproses
msg.Ack()
})
}
// Receive dengan timeout
func receiveWithTimeout(ctx context.Context, sub *pubsub.Subscription,
timeout time.Duration, handler func(*pubsub.Message) error) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
if err := handler(msg); err != nil {
msg.Nack()
return
}
msg.Ack()
})
}
Push Subscription — Server Menerima via HTTP #
// Untuk push subscription, GCP mengirim HTTP POST ke endpoint kamu
// Format body: {"message": {"data": "base64...", "attributes": {...}}, "subscription": "..."}
type PushMessage struct {
Message struct {
Data string `json:"data"`
Attributes map[string]string `json:"attributes"`
MessageID string `json:"messageId"`
PublishTime string `json:"publishTime"`
} `json:"message"`
Subscription string `json:"subscription"`
}
func pubsubPushHandler(w http.ResponseWriter, r *http.Request) {
var pushMsg PushMessage
if err := json.NewDecoder(r.Body).Decode(&pushMsg); err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
// Decode base64 data
data, err := base64.StdEncoding.DecodeString(pushMsg.Message.Data)
if err != nil {
http.Error(w, "bad data", http.StatusBadRequest)
return
}
log.Printf("Push message: ID=%s, data=%s",
pushMsg.Message.MessageID, string(data))
// Proses pesan
if err := processEvent(data, pushMsg.Message.Attributes); err != nil {
// Return non-200 → Pub/Sub akan retry
http.Error(w, "processing failed", http.StatusInternalServerError)
return
}
// Return 200 → Pub/Sub anggap pesan sudah di-ack
w.WriteHeader(http.StatusNoContent)
}
Contoh Program Lengkap — Event Streaming Pipeline #
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
"time"
"os"
"cloud.google.com/go/pubsub"
)
const (
projectID = "my-gcp-project"
topicOrders = "orders"
subOrders = "orders-processor"
)
type OrderEvent struct {
EventID string `json:"event_id"`
Type string `json:"type"`
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
Items []string `json:"items"`
Timestamp time.Time `json:"timestamp"`
}
// EventPublisher mengirim event ke Pub/Sub
type EventPublisher struct {
topic *pubsub.Topic
}
func NewEventPublisher(ctx context.Context, client *pubsub.Client, topicID string) (*EventPublisher, error) {
topic := client.Topic(topicID)
ok, err := topic.Exists(ctx)
if err != nil {
return nil, err
}
if !ok {
topic, err = client.CreateTopic(ctx, topicID)
if err != nil {
return nil, err
}
}
topic.PublishSettings = pubsub.PublishSettings{
CountThreshold: 100,
DelayThreshold: 10 * time.Millisecond,
}
return &EventPublisher{topic: topic}, nil
}
func (p *EventPublisher) Publish(ctx context.Context, event OrderEvent) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
result := p.topic.Publish(ctx, &pubsub.Message{
Data: data,
Attributes: map[string]string{
"event_type": event.Type,
"source": "order-service",
"version": "1.0",
},
// Gunakan OrderID sebagai ordering key — event untuk order yang sama selalu berurutan
OrderingKey: event.OrderID,
})
msgID, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("publish: %w", err)
}
log.Printf("[PUB] event=%s order=%s msgID=%s", event.Type, event.OrderID, msgID)
return nil
}
func (p *EventPublisher) Close() {
p.topic.Stop()
}
// EventProcessor memproses event dari Pub/Sub
type EventProcessor struct {
sub *pubsub.Subscription
stats map[string]int
statsMu sync.Mutex
}
func NewEventProcessor(ctx context.Context, client *pubsub.Client, subID, topicID string) (*EventProcessor, error) {
topic := client.Topic(topicID)
sub := client.Subscription(subID)
ok, err := sub.Exists(ctx)
if err != nil {
return nil, err
}
if !ok {
sub, err = client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 30 * time.Second,
EnableMessageOrdering: true,
})
if err != nil {
return nil, err
}
}
sub.ReceiveSettings = pubsub.ReceiveSettings{
MaxOutstandingMessages: 50,
NumGoroutines: 3,
}
return &EventProcessor{sub: sub, stats: make(map[string]int)}, nil
}
func (p *EventProcessor) Start(ctx context.Context) error {
log.Println("[SUB] Mulai memproses event...")
return p.sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
var event OrderEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.Printf("[SUB] ERROR unmarshal: %v", err)
msg.Nack()
return
}
log.Printf("[SUB] ID=%s type=%s order=%s amount=Rp%.0f",
msg.ID[:8]+"...", event.Type, event.OrderID, event.Amount)
if err := p.process(event); err != nil {
log.Printf("[SUB] GAGAL proses %s: %v", event.OrderID, err)
msg.Nack()
return
}
p.statsMu.Lock()
p.stats[event.Type]++
p.statsMu.Unlock()
msg.Ack()
})
}
func (p *EventProcessor) process(event OrderEvent) error {
switch event.Type {
case "order.created":
fmt.Printf(" → Buat record order %s (Rp%.0f)\n", event.OrderID, event.Amount)
time.Sleep(50 * time.Millisecond)
case "order.paid":
fmt.Printf(" → Tandai paid dan kirim notifikasi: %s\n", event.OrderID)
time.Sleep(30 * time.Millisecond)
case "order.shipped":
fmt.Printf(" → Update tracking: %s\n", event.OrderID)
time.Sleep(20 * time.Millisecond)
case "order.completed":
fmt.Printf(" → Selesaikan order dan update stats: %s\n", event.OrderID)
time.Sleep(40 * time.Millisecond)
}
return nil
}
func (p *EventProcessor) PrintStats() {
p.statsMu.Lock()
defer p.statsMu.Unlock()
fmt.Println("\n=== Statistik Pemrosesan ===")
total := 0
for t, c := range p.stats {
fmt.Printf(" %-20s: %d event\n", t, c)
total += c
}
fmt.Printf(" %-20s: %d event\n", "TOTAL", total)
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("Menerima sinyal, shutdown...")
cancel()
}()
// Setup client
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
log.Fatal("Client:", err)
}
defer client.Close()
// Publisher
publisher, err := NewEventPublisher(ctx, client, topicOrders)
if err != nil {
log.Fatal("Publisher:", err)
}
defer publisher.Close()
// Processor
processor, err := NewEventProcessor(ctx, client, subOrders, topicOrders)
if err != nil {
log.Fatal("Processor:", err)
}
// Start processor di goroutine
go func() {
if err := processor.Start(ctx); err != nil && ctx.Err() == nil {
log.Printf("Processor error: %v", err)
}
}()
// Publish events
fmt.Println("=== Mengirim Order Events ===")
orders := []struct {
id string
events []string
amount float64
items []string
}{
{"ORD-001", []string{"order.created", "order.paid", "order.shipped", "order.completed"},
1_500_000, []string{"Laptop", "Mouse"}},
{"ORD-002", []string{"order.created", "order.paid"},
350_000, []string{"Keyboard"}},
{"ORD-003", []string{"order.created"},
85_000, []string{"Kaos"}},
}
for _, order := range orders {
for i, evtType := range order.events {
event := OrderEvent{
EventID: fmt.Sprintf("evt-%s-%d", order.id, i),
Type: evtType,
OrderID: order.id,
CustomerID: "CUST-42",
Amount: order.amount,
Items: order.items,
Timestamp: time.Now(),
}
if err := publisher.Publish(ctx, event); err != nil {
log.Printf("Gagal publish: %v", err)
}
time.Sleep(100 * time.Millisecond)
}
}
// Tunggu pemrosesan
fmt.Println("\n=== Memproses Events ===")
select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
cancel()
}
processor.PrintStats()
fmt.Println("Selesai.")
}
Ringkasan #
- Pub/Sub adalah fully-managed service di GCP — tidak perlu mengelola broker, replikasi, atau partisi.
- Topic untuk publish; Subscription untuk receive — setiap subscription mendapat copy pesan sendiri.
topic.Publish()mengembalikanPublishResult— panggil.Get(ctx)untuk konfirmasi pengiriman.PublishSettingsmengontrol batching otomatis — sesuaikanCountThresholddanDelayThreshold.OrderingKeymenjamin urutan pesan untuk key yang sama — aktifkanEnableMessageOrderingdi subscription.msg.Ack()untuk konfirmasi proses berhasil;msg.Nack()untuk redeliver sesuai retry policy.ReceiveSettings.NumGoroutinesuntuk parallelism;MaxOutstandingMessagesuntuk backpressure.- Dead letter topic untuk pesan yang gagal setelah
MaxDeliveryAttemptskali.- Filter di subscription (
attributes.source = "order-service") untuk filtering tanpa kode tambahan.- Push subscription untuk serverless (Cloud Functions, Cloud Run) — GCP POST ke endpoint HTTP kamu.