Kafka #
Apache Kafka adalah distributed event streaming platform yang dirancang untuk menangani jutaan event per detik dengan latensi sangat rendah. Berbeda dari message broker tradisional seperti RabbitMQ yang menghapus pesan setelah dikonsumsi, Kafka menyimpan semua event dalam log yang bisa di-replay — sangat berguna untuk audit trail, event sourcing, dan stream processing. Go mendukung Kafka melalui dua library populer: github.com/IBM/sarama (pure Go, lebih fleksibel) dan github.com/confluentinc/confluent-kafka-go (wrapper librdkafka, lebih performat).
Konsep Dasar Kafka #
Topic → kategori pesan (seperti "channel" atau "queue name")
Partition → subdivisi topic untuk parallelism; setiap topic punya 1+ partisi
Offset → posisi unik setiap pesan dalam partisi (monotonically increasing)
Producer → aplikasi yang menulis pesan ke topic
Consumer → aplikasi yang membaca pesan dari topic
Consumer Group → sekelompok consumer yang bekerja sama membaca topic;
setiap partisi hanya dibaca oleh satu consumer dalam group
Broker → server Kafka; cluster terdiri dari banyak broker
Instalasi #
# Sarama — pure Go, lebih fleksibel
go get github.com/IBM/sarama
# Atau Confluent — perlu librdkafka C library
go get github.com/confluentinc/confluent-kafka-go/v2/kafka
Producer — Mengirim Pesan #
Sarama Sync Producer #
import (
"github.com/IBM/sarama"
"encoding/json"
)
func newSyncProducer(brokers []string) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // tunggu semua replika
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
// Idempotent producer — mencegah duplikat saat retry
config.Producer.Idempotent = true
config.Net.MaxOpenRequests = 1 // wajib untuk idempotent
return sarama.NewSyncProducer(brokers, config)
}
func sendMessage(producer sarama.SyncProducer, topic string, key string, value interface{}) error {
data, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(data),
Headers: []sarama.RecordHeader{
{Key: []byte("content-type"), Value: []byte("application/json")},
{Key: []byte("source"), Value: []byte("myapp")},
},
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
return fmt.Errorf("send: %w", err)
}
log.Printf("Pesan terkirim ke partition=%d offset=%d", partition, offset)
return nil
}
Sarama Async Producer — Throughput Tinggi #
func newAsyncProducer(brokers []string) (sarama.AsyncProducer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = 500 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
return sarama.NewAsyncProducer(brokers, config)
}
func runAsyncProducer(producer sarama.AsyncProducer) {
// Goroutine untuk handle success dan error
go func() {
for {
select {
case msg := <-producer.Successes():
log.Printf("OK: partition=%d offset=%d", msg.Partition, msg.Offset)
case err := <-producer.Errors():
log.Printf("ERROR: %v", err)
}
}
}()
// Kirim pesan
for i := 0; i < 100; i++ {
producer.Input() <- &sarama.ProducerMessage{
Topic: "events",
Key: sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
Value: sarama.StringEncoder(fmt.Sprintf(`{"seq":%d}`, i)),
}
}
}
Consumer — Membaca Pesan #
Consumer Group — Cara yang Direkomendasikan #
Consumer group adalah cara idiomatic Kafka — banyak consumer berbagi beban membaca partisi:
// Handler interface yang perlu diimplementasikan
type OrderEventHandler struct {
db *sql.DB
}
// Setup dipanggil saat sesi consumer dimulai
func (h *OrderEventHandler) Setup(sarama.ConsumerGroupSession) error {
log.Println("Consumer group session dimulai")
return nil
}
// Cleanup dipanggil saat sesi berakhir
func (h *OrderEventHandler) Cleanup(sarama.ConsumerGroupSession) error {
log.Println("Consumer group session berakhir")
return nil
}
// ConsumeClaim memproses pesan dari satu partisi
func (h *OrderEventHandler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
for msg := range claim.Messages() {
// Proses pesan
if err := h.processMessage(msg); err != nil {
log.Printf("Error proses pesan offset=%d: %v", msg.Offset, err)
// Jangan commit jika gagal diproses (tergantung strategi retry)
continue
}
// Commit offset — tandai pesan sudah diproses
// MarkMessage hanya buffer commit, tidak langsung flush
session.MarkMessage(msg, "")
}
return nil
}
func (h *OrderEventHandler) processMessage(msg *sarama.ConsumerMessage) error {
log.Printf("Proses: topic=%s partition=%d offset=%d key=%s",
msg.Topic, msg.Partition, msg.Offset, string(msg.Key))
var event OrderEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
// Proses event
return h.handleOrderEvent(event)
}
func (h *OrderEventHandler) handleOrderEvent(event OrderEvent) error {
switch event.Type {
case "order.created":
log.Printf("Order baru: %s, total Rp%.0f", event.OrderID, event.Total)
// Simpan ke DB, kirim notifikasi, dll
case "order.paid":
log.Printf("Order dibayar: %s", event.OrderID)
case "order.shipped":
log.Printf("Order dikirim: %s", event.OrderID)
default:
log.Printf("Event tidak dikenal: %s", event.Type)
}
return nil
}
// Jalankan consumer group
func runConsumerGroup(ctx context.Context, brokers []string, groupID string, topics []string) error {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.NewBalanceStrategyRoundRobin(),
}
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// Auto commit setiap 1 detik (default)
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
client, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
return fmt.Errorf("buat consumer group: %w", err)
}
defer client.Close()
handler := &OrderEventHandler{}
for {
// Consume akan block sampai semua partisi di-assign
// Akan return jika rebalance terjadi — perlu loop
if err := client.Consume(ctx, topics, handler); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return nil
}
return fmt.Errorf("consume: %w", err)
}
// Cek apakah context sudah dibatalkan
if ctx.Err() != nil {
return nil
}
}
}
Delivery Semantics #
Kafka mendukung tiga tingkat jaminan pengiriman:
AT-MOST-ONCE (paling cepat, bisa kehilangan pesan):
Producer: acks=0 (tidak tunggu konfirmasi)
Consumer: commit offset SEBELUM proses pesan
AT-LEAST-ONCE (default, bisa duplikat):
Producer: acks=all, retry=true
Consumer: commit offset SETELAH berhasil proses pesan
EXACTLY-ONCE (paling aman, paling kompleks):
Producer: idempotent=true + transactional
Consumer: read_committed + manual commit dalam transaksi
At-Least-Once dengan Manual Commit #
// Nonaktifkan auto-commit, commit manual setelah proses
config.Consumer.Offsets.AutoCommit.Enable = false
func (h *Handler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
for msg := range claim.Messages() {
if err := h.process(msg); err != nil {
log.Printf("GAGAL proses offset=%d, akan diproses ulang", msg.Offset)
// Tidak mark → akan diproses lagi saat restart
continue
}
// Mark hanya jika berhasil
session.MarkMessage(msg, "")
// Commit sekarang (tidak tunggu interval)
session.Commit()
}
return nil
}
Confluent Kafka Go — Alternatif Performat #
import "github.com/confluentinc/confluent-kafka-go/v2/kafka"
// Producer
func newConfluentProducer(brokers string) (*kafka.Producer, error) {
return kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"acks": "all",
"retries": 5,
"enable.idempotence": true,
})
}
func produce(p *kafka.Producer, topic, key string, value []byte) error {
deliveryCh := make(chan kafka.Event)
err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte(key),
Value: value,
}, deliveryCh)
if err != nil {
return err
}
e := <-deliveryCh
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
}
log.Printf("Delivered: partition=%d offset=%d",
m.TopicPartition.Partition, m.TopicPartition.Offset)
return nil
}
// Consumer
func newConfluentConsumer(brokers, groupID string) (*kafka.Consumer, error) {
return kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": groupID,
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
"max.poll.interval.ms": 300000,
})
}
func consumeMessages(c *kafka.Consumer, topics []string, ctx context.Context) error {
if err := c.SubscribeTopics(topics, nil); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
default:
}
msg, err := c.ReadMessage(100 * time.Millisecond)
if err != nil {
if err.(kafka.Error).Code() == kafka.ErrTimedOut {
continue
}
return err
}
log.Printf("Received: %s [%d] @ %d\n",
*msg.TopicPartition.Topic, msg.TopicPartition.Partition,
msg.TopicPartition.Offset)
// Proses pesan
if err := processMessage(msg.Value); err != nil {
log.Printf("Error: %v", err)
continue
}
// Commit manual
c.CommitMessage(msg)
}
}
Contoh Program Lengkap — Event-Driven Order System #
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/IBM/sarama"
)
const (
brokerAddr = "localhost:9092"
topicOrders = "orders"
groupID = "order-processor"
)
type OrderEvent struct {
EventID string `json:"event_id"`
Type string `json:"type"`
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Total float64 `json:"total"`
Items []string `json:"items"`
Timestamp time.Time `json:"timestamp"`
}
// ── Producer ──────────────────────────────────────────────────
type EventProducer struct {
producer sarama.SyncProducer
}
func NewEventProducer(brokers []string) (*EventProducer, error) {
cfg := sarama.NewConfig()
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Retry.Max = 5
cfg.Producer.Return.Successes = true
cfg.Producer.Idempotent = true
cfg.Net.MaxOpenRequests = 1
p, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return nil, err
}
return &EventProducer{producer: p}, nil
}
func (ep *EventProducer) Publish(event OrderEvent) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
part, offset, err := ep.producer.SendMessage(&sarama.ProducerMessage{
Topic: topicOrders,
Key: sarama.StringEncoder(event.OrderID), // same key → same partition
Value: sarama.ByteEncoder(data),
Timestamp: event.Timestamp,
})
if err != nil {
return fmt.Errorf("publish: %w", err)
}
log.Printf("[PRODUCER] event=%s order=%s → partition=%d offset=%d",
event.Type, event.OrderID, part, offset)
return nil
}
func (ep *EventProducer) Close() error {
return ep.producer.Close()
}
// ── Consumer ──────────────────────────────────────────────────
type orderHandler struct {
processed int
}
func (h *orderHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (h *orderHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (h *orderHandler) ConsumeClaim(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
for msg := range claim.Messages() {
var event OrderEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("[CONSUMER] ERROR unmarshal offset=%d: %v", msg.Offset, err)
sess.MarkMessage(msg, "")
continue
}
log.Printf("[CONSUMER] partition=%d offset=%d type=%s order=%s",
msg.Partition, msg.Offset, event.Type, event.OrderID)
// Simulasi proses
switch event.Type {
case "order.created":
fmt.Printf(" → Simpan order %s ke DB (total Rp%.0f)\n",
event.OrderID, event.Total)
case "order.paid":
fmt.Printf(" → Tandai order %s sebagai PAID\n", event.OrderID)
case "order.shipped":
fmt.Printf(" → Update tracking order %s\n", event.OrderID)
}
h.processed++
sess.MarkMessage(msg, "")
}
return nil
}
// ── Main ──────────────────────────────────────────────────────
func main() {
brokers := []string{brokerAddr}
ctx, cancel := context.WithCancel(context.Background())
// Tangkap sinyal untuk graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Start consumer di goroutine terpisah
cfg := sarama.NewConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg.Consumer.Offsets.AutoCommit.Enable = true
group, err := sarama.NewConsumerGroup(brokers, groupID, cfg)
if err != nil {
log.Fatal("Consumer group:", err)
}
handler := &orderHandler{}
go func() {
for {
if err := group.Consume(ctx, []string{topicOrders}, handler); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
log.Printf("Consume error: %v", err)
}
if ctx.Err() != nil {
return
}
}
}()
// Producer: kirim beberapa event
producer, err := NewEventProducer(brokers)
if err != nil {
log.Fatal("Producer:", err)
}
events := []OrderEvent{
{
EventID: "evt-001", Type: "order.created",
OrderID: "ORD-2024-001", CustomerID: "CUST-42",
Total: 1_500_000, Items: []string{"Laptop", "Mouse"},
Timestamp: time.Now(),
},
{
EventID: "evt-002", Type: "order.paid",
OrderID: "ORD-2024-001", CustomerID: "CUST-42",
Total: 1_500_000, Timestamp: time.Now().Add(5 * time.Second),
},
{
EventID: "evt-003", Type: "order.created",
OrderID: "ORD-2024-002", CustomerID: "CUST-99",
Total: 350_000, Items: []string{"Keyboard"},
Timestamp: time.Now().Add(10 * time.Second),
},
{
EventID: "evt-004", Type: "order.shipped",
OrderID: "ORD-2024-001", CustomerID: "CUST-42",
Total: 1_500_000, Timestamp: time.Now().Add(15 * time.Second),
},
}
fmt.Println("=== Mengirim Events ke Kafka ===")
for _, evt := range events {
if err := producer.Publish(evt); err != nil {
log.Printf("Gagal publish: %v", err)
}
time.Sleep(200 * time.Millisecond)
}
// Tunggu consumer memproses atau sinyal berhenti
select {
case <-sigCh:
fmt.Println("\nMenerima sinyal shutdown...")
case <-time.After(5 * time.Second):
fmt.Printf("\nTimeout. Consumer memproses %d pesan.\n", handler.processed)
}
cancel()
producer.Close()
group.Close()
fmt.Println("Selesai.")
}
Ringkasan #
- Kafka menyimpan pesan dalam log yang bisa di-replay — berbeda dari RabbitMQ yang menghapus setelah dikonsumsi.
- Consumer group adalah cara idiomatic — banyak consumer berbagi partisi; satu partisi hanya dibaca satu consumer per waktu.
- Key yang sama selalu masuk ke partisi yang sama — penting untuk menjaga urutan event per entity (order, user).
- Loop
client.Consumeharus dijalankan karena akan return saat rebalance terjadi.- Manual commit (
AutoCommit.Enable = false) untuk at-least-once yang aman — commit hanya setelah berhasil proses.- Idempotent producer (
Idempotent = true) mencegah duplikat saat retry.- Async producer untuk throughput tinggi — handle success/error via channel terpisah.
- Compression (
Snappy,LZ4,Zstd) untuk mengurangi bandwidth — sangat berguna untuk payload besar.OffsetOldestuntuk membaca dari awal;OffsetNewestuntuk hanya pesan baru.- Graceful shutdown: cancel context → tunggu consumer selesai → close producer → close consumer group.