Kafka #

Apache Kafka adalah distributed event streaming platform yang dirancang untuk menangani jutaan event per detik dengan latensi sangat rendah. Berbeda dari message broker tradisional seperti RabbitMQ yang menghapus pesan setelah dikonsumsi, Kafka menyimpan semua event dalam log yang bisa di-replay — sangat berguna untuk audit trail, event sourcing, dan stream processing. Go mendukung Kafka melalui dua library populer: github.com/IBM/sarama (pure Go, lebih fleksibel) dan github.com/confluentinc/confluent-kafka-go (wrapper librdkafka, lebih performat).

Konsep Dasar Kafka #

Topic      → kategori pesan (seperti "channel" atau "queue name")
Partition  → subdivisi topic untuk parallelism; setiap topic punya 1+ partisi
Offset     → posisi unik setiap pesan dalam partisi (monotonically increasing)
Producer   → aplikasi yang menulis pesan ke topic
Consumer   → aplikasi yang membaca pesan dari topic
Consumer Group → sekelompok consumer yang bekerja sama membaca topic;
               setiap partisi hanya dibaca oleh satu consumer dalam group
Broker     → server Kafka; cluster terdiri dari banyak broker

Instalasi #

# Sarama — pure Go, lebih fleksibel
go get github.com/IBM/sarama

# Atau Confluent — perlu librdkafka C library
go get github.com/confluentinc/confluent-kafka-go/v2/kafka

Producer — Mengirim Pesan #

Sarama Sync Producer #

import (
    "github.com/IBM/sarama"
    "encoding/json"
)

func newSyncProducer(brokers []string) (sarama.SyncProducer, error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll  // tunggu semua replika
    config.Producer.Retry.Max = 5
    config.Producer.Return.Successes = true

    // Idempotent producer — mencegah duplikat saat retry
    config.Producer.Idempotent = true
    config.Net.MaxOpenRequests = 1  // wajib untuk idempotent

    return sarama.NewSyncProducer(brokers, config)
}

func sendMessage(producer sarama.SyncProducer, topic string, key string, value interface{}) error {
    data, err := json.Marshal(value)
    if err != nil {
        return fmt.Errorf("marshal: %w", err)
    }

    msg := &sarama.ProducerMessage{
        Topic: topic,
        Key:   sarama.StringEncoder(key),
        Value: sarama.ByteEncoder(data),
        Headers: []sarama.RecordHeader{
            {Key: []byte("content-type"), Value: []byte("application/json")},
            {Key: []byte("source"), Value: []byte("myapp")},
        },
    }

    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        return fmt.Errorf("send: %w", err)
    }

    log.Printf("Pesan terkirim ke partition=%d offset=%d", partition, offset)
    return nil
}

Sarama Async Producer — Throughput Tinggi #

func newAsyncProducer(brokers []string) (sarama.AsyncProducer, error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal
    config.Producer.Compression = sarama.CompressionSnappy
    config.Producer.Flush.Frequency = 500 * time.Millisecond
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true

    return sarama.NewAsyncProducer(brokers, config)
}

func runAsyncProducer(producer sarama.AsyncProducer) {
    // Goroutine untuk handle success dan error
    go func() {
        for {
            select {
            case msg := <-producer.Successes():
                log.Printf("OK: partition=%d offset=%d", msg.Partition, msg.Offset)
            case err := <-producer.Errors():
                log.Printf("ERROR: %v", err)
            }
        }
    }()

    // Kirim pesan
    for i := 0; i < 100; i++ {
        producer.Input() <- &sarama.ProducerMessage{
            Topic: "events",
            Key:   sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
            Value: sarama.StringEncoder(fmt.Sprintf(`{"seq":%d}`, i)),
        }
    }
}

Consumer — Membaca Pesan #

Consumer Group — Cara yang Direkomendasikan #

Consumer group adalah cara idiomatic Kafka — banyak consumer berbagi beban membaca partisi:

// Handler interface yang perlu diimplementasikan
type OrderEventHandler struct {
    db *sql.DB
}

// Setup dipanggil saat sesi consumer dimulai
func (h *OrderEventHandler) Setup(sarama.ConsumerGroupSession) error {
    log.Println("Consumer group session dimulai")
    return nil
}

// Cleanup dipanggil saat sesi berakhir
func (h *OrderEventHandler) Cleanup(sarama.ConsumerGroupSession) error {
    log.Println("Consumer group session berakhir")
    return nil
}

// ConsumeClaim memproses pesan dari satu partisi
func (h *OrderEventHandler) ConsumeClaim(
    session sarama.ConsumerGroupSession,
    claim sarama.ConsumerGroupClaim,
) error {
    for msg := range claim.Messages() {
        // Proses pesan
        if err := h.processMessage(msg); err != nil {
            log.Printf("Error proses pesan offset=%d: %v", msg.Offset, err)
            // Jangan commit jika gagal diproses (tergantung strategi retry)
            continue
        }

        // Commit offset — tandai pesan sudah diproses
        // MarkMessage hanya buffer commit, tidak langsung flush
        session.MarkMessage(msg, "")
    }
    return nil
}

func (h *OrderEventHandler) processMessage(msg *sarama.ConsumerMessage) error {
    log.Printf("Proses: topic=%s partition=%d offset=%d key=%s",
        msg.Topic, msg.Partition, msg.Offset, string(msg.Key))

    var event OrderEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal: %w", err)
    }

    // Proses event
    return h.handleOrderEvent(event)
}

func (h *OrderEventHandler) handleOrderEvent(event OrderEvent) error {
    switch event.Type {
    case "order.created":
        log.Printf("Order baru: %s, total Rp%.0f", event.OrderID, event.Total)
        // Simpan ke DB, kirim notifikasi, dll
    case "order.paid":
        log.Printf("Order dibayar: %s", event.OrderID)
    case "order.shipped":
        log.Printf("Order dikirim: %s", event.OrderID)
    default:
        log.Printf("Event tidak dikenal: %s", event.Type)
    }
    return nil
}

// Jalankan consumer group
func runConsumerGroup(ctx context.Context, brokers []string, groupID string, topics []string) error {
    config := sarama.NewConfig()
    config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
        sarama.NewBalanceStrategyRoundRobin(),
    }
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
    // Auto commit setiap 1 detik (default)
    config.Consumer.Offsets.AutoCommit.Enable = true
    config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

    client, err := sarama.NewConsumerGroup(brokers, groupID, config)
    if err != nil {
        return fmt.Errorf("buat consumer group: %w", err)
    }
    defer client.Close()

    handler := &OrderEventHandler{}

    for {
        // Consume akan block sampai semua partisi di-assign
        // Akan return jika rebalance terjadi — perlu loop
        if err := client.Consume(ctx, topics, handler); err != nil {
            if errors.Is(err, sarama.ErrClosedConsumerGroup) {
                return nil
            }
            return fmt.Errorf("consume: %w", err)
        }

        // Cek apakah context sudah dibatalkan
        if ctx.Err() != nil {
            return nil
        }
    }
}

Delivery Semantics #

Kafka mendukung tiga tingkat jaminan pengiriman:

AT-MOST-ONCE (paling cepat, bisa kehilangan pesan):
  Producer: acks=0 (tidak tunggu konfirmasi)
  Consumer: commit offset SEBELUM proses pesan

AT-LEAST-ONCE (default, bisa duplikat):
  Producer: acks=all, retry=true
  Consumer: commit offset SETELAH berhasil proses pesan

EXACTLY-ONCE (paling aman, paling kompleks):
  Producer: idempotent=true + transactional
  Consumer: read_committed + manual commit dalam transaksi

At-Least-Once dengan Manual Commit #

// Nonaktifkan auto-commit, commit manual setelah proses
config.Consumer.Offsets.AutoCommit.Enable = false

func (h *Handler) ConsumeClaim(
    session sarama.ConsumerGroupSession,
    claim sarama.ConsumerGroupClaim,
) error {
    for msg := range claim.Messages() {
        if err := h.process(msg); err != nil {
            log.Printf("GAGAL proses offset=%d, akan diproses ulang", msg.Offset)
            // Tidak mark → akan diproses lagi saat restart
            continue
        }
        // Mark hanya jika berhasil
        session.MarkMessage(msg, "")
        // Commit sekarang (tidak tunggu interval)
        session.Commit()
    }
    return nil
}

Confluent Kafka Go — Alternatif Performat #

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

// Producer
func newConfluentProducer(brokers string) (*kafka.Producer, error) {
    return kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers":  brokers,
        "acks":               "all",
        "retries":            5,
        "enable.idempotence": true,
    })
}

func produce(p *kafka.Producer, topic, key string, value []byte) error {
    deliveryCh := make(chan kafka.Event)
    err := p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &topic,
            Partition: kafka.PartitionAny,
        },
        Key:   []byte(key),
        Value: value,
    }, deliveryCh)
    if err != nil {
        return err
    }

    e := <-deliveryCh
    m := e.(*kafka.Message)
    if m.TopicPartition.Error != nil {
        return m.TopicPartition.Error
    }
    log.Printf("Delivered: partition=%d offset=%d",
        m.TopicPartition.Partition, m.TopicPartition.Offset)
    return nil
}

// Consumer
func newConfluentConsumer(brokers, groupID string) (*kafka.Consumer, error) {
    return kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":        brokers,
        "group.id":                 groupID,
        "auto.offset.reset":        "earliest",
        "enable.auto.commit":       false,
        "max.poll.interval.ms":     300000,
    })
}

func consumeMessages(c *kafka.Consumer, topics []string, ctx context.Context) error {
    if err := c.SubscribeTopics(topics, nil); err != nil {
        return err
    }

    for {
        select {
        case <-ctx.Done():
            return nil
        default:
        }

        msg, err := c.ReadMessage(100 * time.Millisecond)
        if err != nil {
            if err.(kafka.Error).Code() == kafka.ErrTimedOut {
                continue
            }
            return err
        }

        log.Printf("Received: %s [%d] @ %d\n",
            *msg.TopicPartition.Topic, msg.TopicPartition.Partition,
            msg.TopicPartition.Offset)

        // Proses pesan
        if err := processMessage(msg.Value); err != nil {
            log.Printf("Error: %v", err)
            continue
        }

        // Commit manual
        c.CommitMessage(msg)
    }
}

Contoh Program Lengkap — Event-Driven Order System #

package main

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/IBM/sarama"
)

const (
    brokerAddr  = "localhost:9092"
    topicOrders = "orders"
    groupID     = "order-processor"
)

type OrderEvent struct {
    EventID   string    `json:"event_id"`
    Type      string    `json:"type"`
    OrderID   string    `json:"order_id"`
    CustomerID string   `json:"customer_id"`
    Total     float64   `json:"total"`
    Items     []string  `json:"items"`
    Timestamp time.Time `json:"timestamp"`
}

// ── Producer ──────────────────────────────────────────────────

type EventProducer struct {
    producer sarama.SyncProducer
}

func NewEventProducer(brokers []string) (*EventProducer, error) {
    cfg := sarama.NewConfig()
    cfg.Producer.RequiredAcks = sarama.WaitForAll
    cfg.Producer.Retry.Max = 5
    cfg.Producer.Return.Successes = true
    cfg.Producer.Idempotent = true
    cfg.Net.MaxOpenRequests = 1

    p, err := sarama.NewSyncProducer(brokers, cfg)
    if err != nil {
        return nil, err
    }
    return &EventProducer{producer: p}, nil
}

func (ep *EventProducer) Publish(event OrderEvent) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }

    part, offset, err := ep.producer.SendMessage(&sarama.ProducerMessage{
        Topic: topicOrders,
        Key:   sarama.StringEncoder(event.OrderID),  // same key → same partition
        Value: sarama.ByteEncoder(data),
        Timestamp: event.Timestamp,
    })
    if err != nil {
        return fmt.Errorf("publish: %w", err)
    }
    log.Printf("[PRODUCER] event=%s order=%s → partition=%d offset=%d",
        event.Type, event.OrderID, part, offset)
    return nil
}

func (ep *EventProducer) Close() error {
    return ep.producer.Close()
}

// ── Consumer ──────────────────────────────────────────────────

type orderHandler struct {
    processed int
}

func (h *orderHandler) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (h *orderHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

func (h *orderHandler) ConsumeClaim(
    sess sarama.ConsumerGroupSession,
    claim sarama.ConsumerGroupClaim,
) error {
    for msg := range claim.Messages() {
        var event OrderEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("[CONSUMER] ERROR unmarshal offset=%d: %v", msg.Offset, err)
            sess.MarkMessage(msg, "")
            continue
        }

        log.Printf("[CONSUMER] partition=%d offset=%d type=%s order=%s",
            msg.Partition, msg.Offset, event.Type, event.OrderID)

        // Simulasi proses
        switch event.Type {
        case "order.created":
            fmt.Printf("  → Simpan order %s ke DB (total Rp%.0f)\n",
                event.OrderID, event.Total)
        case "order.paid":
            fmt.Printf("  → Tandai order %s sebagai PAID\n", event.OrderID)
        case "order.shipped":
            fmt.Printf("  → Update tracking order %s\n", event.OrderID)
        }

        h.processed++
        sess.MarkMessage(msg, "")
    }
    return nil
}

// ── Main ──────────────────────────────────────────────────────

func main() {
    brokers := []string{brokerAddr}
    ctx, cancel := context.WithCancel(context.Background())

    // Tangkap sinyal untuk graceful shutdown
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    // Start consumer di goroutine terpisah
    cfg := sarama.NewConfig()
    cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
    cfg.Consumer.Offsets.AutoCommit.Enable = true

    group, err := sarama.NewConsumerGroup(brokers, groupID, cfg)
    if err != nil {
        log.Fatal("Consumer group:", err)
    }

    handler := &orderHandler{}
    go func() {
        for {
            if err := group.Consume(ctx, []string{topicOrders}, handler); err != nil {
                if errors.Is(err, sarama.ErrClosedConsumerGroup) {
                    return
                }
                log.Printf("Consume error: %v", err)
            }
            if ctx.Err() != nil {
                return
            }
        }
    }()

    // Producer: kirim beberapa event
    producer, err := NewEventProducer(brokers)
    if err != nil {
        log.Fatal("Producer:", err)
    }

    events := []OrderEvent{
        {
            EventID: "evt-001", Type: "order.created",
            OrderID: "ORD-2024-001", CustomerID: "CUST-42",
            Total: 1_500_000, Items: []string{"Laptop", "Mouse"},
            Timestamp: time.Now(),
        },
        {
            EventID: "evt-002", Type: "order.paid",
            OrderID: "ORD-2024-001", CustomerID: "CUST-42",
            Total: 1_500_000, Timestamp: time.Now().Add(5 * time.Second),
        },
        {
            EventID: "evt-003", Type: "order.created",
            OrderID: "ORD-2024-002", CustomerID: "CUST-99",
            Total: 350_000, Items: []string{"Keyboard"},
            Timestamp: time.Now().Add(10 * time.Second),
        },
        {
            EventID: "evt-004", Type: "order.shipped",
            OrderID: "ORD-2024-001", CustomerID: "CUST-42",
            Total: 1_500_000, Timestamp: time.Now().Add(15 * time.Second),
        },
    }

    fmt.Println("=== Mengirim Events ke Kafka ===")
    for _, evt := range events {
        if err := producer.Publish(evt); err != nil {
            log.Printf("Gagal publish: %v", err)
        }
        time.Sleep(200 * time.Millisecond)
    }

    // Tunggu consumer memproses atau sinyal berhenti
    select {
    case <-sigCh:
        fmt.Println("\nMenerima sinyal shutdown...")
    case <-time.After(5 * time.Second):
        fmt.Printf("\nTimeout. Consumer memproses %d pesan.\n", handler.processed)
    }

    cancel()
    producer.Close()
    group.Close()
    fmt.Println("Selesai.")
}

Ringkasan #

  • Kafka menyimpan pesan dalam log yang bisa di-replay — berbeda dari RabbitMQ yang menghapus setelah dikonsumsi.
  • Consumer group adalah cara idiomatic — banyak consumer berbagi partisi; satu partisi hanya dibaca satu consumer per waktu.
  • Key yang sama selalu masuk ke partisi yang sama — penting untuk menjaga urutan event per entity (order, user).
  • Loop client.Consume harus dijalankan karena akan return saat rebalance terjadi.
  • Manual commit (AutoCommit.Enable = false) untuk at-least-once yang aman — commit hanya setelah berhasil proses.
  • Idempotent producer (Idempotent = true) mencegah duplikat saat retry.
  • Async producer untuk throughput tinggi — handle success/error via channel terpisah.
  • Compression (Snappy, LZ4, Zstd) untuk mengurangi bandwidth — sangat berguna untuk payload besar.
  • OffsetOldest untuk membaca dari awal; OffsetNewest untuk hanya pesan baru.
  • Graceful shutdown: cancel context → tunggu consumer selesai → close producer → close consumer group.

← Sebelumnya: Elasticsearch   Berikutnya: RabbitMQ →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact