Goroutine #

Goroutine adalah salah satu alasan terkuat mengapa Go unggul untuk sistem concurrent. Bukan karena Go satu-satunya bahasa dengan concurrency, tapi karena goroutine membuatnya sangat mudah dan murah: sebuah goroutine hanya membutuhkan ~2KB stack saat pertama dibuat (bisa tumbuh secara dinamis), sehingga kamu bisa menjalankan ratusan ribu goroutine dalam satu proses tanpa kehabisan memory. OS thread sebaliknya membutuhkan ~1-8MB stack yang fixed. Model ini — yang disebut M:N scheduling (M goroutine dijadwalkan pada N OS thread oleh Go runtime) — adalah yang membuat Go sangat efisien untuk I/O-bound workloads seperti server HTTP yang menangani ribuan koneksi concurrent.

Cara Membuat Goroutine #

Cukup tambahkan keyword go sebelum pemanggilan fungsi:

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    fmt.Printf("Halo, %s!\n", name)
}

func main() {
    go sayHello("Budi")    // jalankan sebagai goroutine
    go sayHello("Sari")    // goroutine kedua
    go func() {            // goroutine dengan fungsi anonim
        fmt.Println("Goroutine anonim berjalan")
    }()

    // MASALAH: main() tidak menunggu goroutine selesai!
    // Jika main() exit, semua goroutine langsung dihentikan
    time.Sleep(100 * time.Millisecond)  // solusi sementara, tidak idiomatic
}

Mengapa time.Sleep Bukan Solusi yang Benar #

time.Sleep untuk menunggu goroutine adalah anti-pattern — kamu tidak tahu berapa lama goroutine perlu waktu. Solusi yang benar adalah sync.WaitGroup atau channel.


sync.WaitGroup — Menunggu Banyak Goroutine #

WaitGroup adalah counter yang memungkinkan satu goroutine menunggu sekumpulan goroutine lain selesai:

import "sync"

func prosesData(id int, wg *sync.WaitGroup) {
    defer wg.Done()  // pastikan Done() selalu dipanggil, bahkan jika panic
    fmt.Printf("Worker %d mulai\n", id)
    // ... lakukan pekerjaan
    fmt.Printf("Worker %d selesai\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)          // tambah counter sebelum go
        go prosesData(i, &wg)
    }

    wg.Wait()  // block sampai counter menjadi 0
    fmt.Println("Semua worker selesai")
}

wg.Add(1) harus dipanggil sebelum go, bukan di dalam goroutine. Jika dipanggil di dalam goroutine, ada kemungkinan wg.Wait() dipanggil sebelum Add() sehingga program langsung exit tanpa menunggu.

// ANTI-PATTERN: Add dipanggil di dalam goroutine
go func() {
    wg.Add(1)  // ✗ terlambat — Wait() mungkin sudah lewat
    defer wg.Done()
    // ...
}()

// BENAR: Add dipanggil sebelum go
wg.Add(1)
go func() {
    defer wg.Done()  // ✓
    // ...
}()

Channel — Komunikasi Antar Goroutine #

Channel adalah mekanisme untuk goroutine berkomunikasi dengan aman — “jangan berkomunikasi dengan berbagi memori; berbagi memori dengan berkomunikasi.”

Unbuffered Channel #

Unbuffered channel menyebabkan pengirim block sampai penerima siap, dan sebaliknya. Ini menjamin sinkronisasi:

ch := make(chan int)  // unbuffered

// Pengirim — goroutine
go func() {
    fmt.Println("Mengirim nilai...")
    ch <- 42           // block sampai ada yang menerima
    fmt.Println("Nilai terkirim")
}()

// Penerima — goroutine utama
nilai := <-ch          // block sampai ada yang mengirim
fmt.Println("Diterima:", nilai)

Buffered Channel #

Buffered channel punya kapasitas internal. Pengirim hanya block jika buffer penuh:

ch := make(chan int, 3)  // buffered, kapasitas 3

ch <- 1    // tidak block, masuk buffer
ch <- 2    // tidak block
ch <- 3    // tidak block
// ch <- 4  // block! buffer penuh

fmt.Println(<-ch)  // 1
fmt.Println(<-ch)  // 2
fmt.Println(<-ch)  // 3

Close dan Range pada Channel #

ch := make(chan int, 5)

// Kirim beberapa nilai
go func() {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)  // sinyal bahwa tidak ada lagi nilai yang akan dikirim
}()

// Range otomatis berhenti saat channel ditutup
for v := range ch {
    fmt.Println(v)  // 0 1 2 3 4
}

// Cek apakah channel masih terbuka
v, ok := <-ch
if !ok {
    fmt.Println("Channel sudah ditutup, nilai:", v)  // v adalah zero value
}
Jangan pernah menutup channel dari sisi penerima, dan jangan close channel yang sudah ditutup — keduanya menyebabkan panic. Konvensi: hanya pengirim yang boleh menutup channel.

Directional Channel — Channel dengan Arah #

Kamu bisa membatasi channel ke hanya send atau hanya receive untuk kejelasan intent:

func producer(ch chan<- int) {  // send-only channel
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
    // <-ch  // ← compile error: receive from send-only channel
}

func consumer(ch <-chan int) {  // receive-only channel
    for v := range ch {
        fmt.Println("Consumed:", v)
    }
    // ch <- 1  // ← compile error: send to receive-only channel
}

func main() {
    ch := make(chan int, 5)
    go producer(ch)
    consumer(ch)
}

select — Multiplexing Channel #

select memungkinkan satu goroutine menunggu beberapa operasi channel sekaligus, mengeksekusi case yang pertama siap:

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "satu"
    }()
    go func() {
        time.Sleep(500 * time.Millisecond)
        ch2 <- "dua"
    }()

    // select memilih case yang pertama siap
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Dari ch1:", msg1)
        case msg2 := <-ch2:
            fmt.Println("Dari ch2:", msg2)
        }
    }
    // Output: Dari ch2: dua (pertama), lalu Dari ch1: satu
}

Select dengan Timeout #

func fetchData(url string) (string, error) {
    resultCh := make(chan string, 1)

    go func() {
        // simulasi HTTP request
        time.Sleep(2 * time.Second)
        resultCh <- "data dari " + url
    }()

    select {
    case result := <-resultCh:
        return result, nil
    case <-time.After(1 * time.Second):
        return "", fmt.Errorf("timeout: request ke %s terlalu lama", url)
    }
}

Select Non-Blocking dengan Default #

ch := make(chan int, 1)

// Coba kirim tanpa block
select {
case ch <- 42:
    fmt.Println("Berhasil kirim")
default:
    fmt.Println("Channel penuh, skip")
}

// Coba terima tanpa block
select {
case v := <-ch:
    fmt.Println("Terima:", v)
default:
    fmt.Println("Tidak ada nilai, skip")
}

sync.Mutex — Mutual Exclusion #

Ketika goroutine perlu mengakses shared state (bukan via channel), gunakan Mutex untuk mencegah race condition:

import "sync"

type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()  // unlock pasti dipanggil
    c.count++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    counter := &SafeCounter{}
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Println("Final:", counter.Value())  // selalu 1000
}

sync.RWMutex — Read-Write Lock #

Untuk workload yang lebih banyak read dari write, RWMutex lebih efisien karena mengizinkan banyak reader sekaligus:

type Cache struct {
    mu    sync.RWMutex
    store map[string]string
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()          // write lock — eksklusif
    defer c.mu.Unlock()
    c.store[key] = value
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()         // read lock — bisa concurrent dengan reader lain
    defer c.mu.RUnlock()
    v, ok := c.store[key]
    return v, ok
}

sync/atomic — Operasi Atomik #

Untuk operasi sederhana pada tipe numerik, atomic lebih ringan dari mutex karena tidak butuh lock:

import "sync/atomic"

var counter int64

// Increment atomik — thread-safe tanpa mutex
atomic.AddInt64(&counter, 1)

// Load atomik — baca nilai terkini dengan aman
val := atomic.LoadInt64(&counter)

// Store atomik
atomic.StoreInt64(&counter, 0)

// CompareAndSwap — ubah hanya jika nilai saat ini sesuai
swapped := atomic.CompareAndSwapInt64(&counter, 0, 100)
fmt.Println("Swapped:", swapped)

// Sejak Go 1.19 — atomic.Value untuk tipe apapun
var v atomic.Value
v.Store("hello")
fmt.Println(v.Load())  // "hello"

Race Condition dan Race Detector #

Race condition terjadi ketika dua goroutine mengakses variabel yang sama secara bersamaan dan minimal salah satunya menulis — tanpa sinkronisasi yang tepat:

// ANTI-PATTERN: race condition
var count int

go func() { count++ }()  // goroutine 1 baca+tulis
go func() { count++ }()  // goroutine 2 baca+tulis concurrent

// Nilai akhir count tidak dapat diprediksi!

Go menyediakan race detector yang sangat berguna:

go run -race main.go
go test -race ./...
go build -race -o myapp .

Race detector akan mencetak laporan detail saat race terdeteksi:

==================
WARNING: DATA RACE
Write at 0x00c0000b4010 by goroutine 7:
  main.main.func2()
      /home/user/main.go:12 +0x38

Previous write at 0x00c0000b4010 by goroutine 6:
  main.main.func1()
      /home/user/main.go:11 +0x38
==================

Context — Cancellation dan Timeout #

context.Context adalah cara standar Go untuk menyebarkan sinyal cancellation, deadline, dan value antar goroutine:

import "context"

func fetchUser(ctx context.Context, id int) (*User, error) {
    // Buat request yang bisa dibatalkan
    req, _ := http.NewRequestWithContext(ctx, "GET",
        fmt.Sprintf("/users/%d", id), nil)

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, fmt.Errorf("fetchUser: %w", err)
    }
    defer resp.Body.Close()
    // ...
}

func main() {
    // Context dengan timeout 5 detik
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()  // selalu panggil cancel untuk bebaskan resource

    user, err := fetchUser(ctx, 42)
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            fmt.Println("Request timeout")
        }
        return
    }
    fmt.Println(user)
}

// Propagasi cancellation ke goroutine anak
func processAll(ctx context.Context, items []Item) error {
    for _, item := range items {
        // Cek apakah context sudah dibatalkan
        select {
        case <-ctx.Done():
            return ctx.Err()  // context.Canceled atau context.DeadlineExceeded
        default:
        }

        if err := process(ctx, item); err != nil {
            return err
        }
    }
    return nil
}

Goroutine Leak — Goroutine yang Tidak Pernah Berhenti #

Goroutine leak terjadi ketika goroutine dibuat tapi tidak pernah berhenti — biasanya karena menunggu channel yang tidak pernah menerima nilai atau ditutup:

// ANTI-PATTERN: goroutine leak
func doWork() <-chan int {
    ch := make(chan int)
    go func() {
        for {
            ch <- rand.Int()  // goroutine ini berjalan selamanya!
        }
    }()
    return ch
}

// BENAR: gunakan done channel atau context untuk stop
func doWorkWithStop(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for {
            select {
            case ch <- rand.Int():
            case <-ctx.Done():
                return  // berhenti saat context dibatalkan
            }
        }
    }()
    return ch
}

Pola Idiomatik #

Worker Pool #

func workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for job := range jobs {
                results <- job * job  // proses job
            }
        }(i)
    }
    go func() {
        wg.Wait()
        close(results)
    }()
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    go workerPool(5, jobs, results)

    for i := 1; i <= 20; i++ {
        jobs <- i
    }
    close(jobs)

    for r := range results {
        fmt.Println(r)
    }
}

Pipeline #

// Stage 1: generate numbers
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Stage 2: square each number
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // Rangkai pipeline
    c := generate(2, 3, 4, 5)
    out := square(square(c))  // square dua kali

    for v := range out {
        fmt.Println(v)  // 16, 81, 256, 625
    }
}

Contoh Program Lengkap #

package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

// Job merepresentasikan pekerjaan yang perlu diproses
type Job struct {
    ID    int
    Value int
}

// Result merepresentasikan hasil pemrosesan
type Result struct {
    JobID  int
    Output int
    Worker int
}

// Stats melacak statistik dengan operasi atomik
type Stats struct {
    processed int64
    errors    int64
    totalTime int64
}

func (s *Stats) RecordSuccess(elapsed time.Duration) {
    atomic.AddInt64(&s.processed, 1)
    atomic.AddInt64(&s.totalTime, int64(elapsed))
}

func (s *Stats) RecordError() {
    atomic.AddInt64(&s.errors, 1)
}

func (s *Stats) Report() {
    processed := atomic.LoadInt64(&s.processed)
    errors := atomic.LoadInt64(&s.errors)
    totalTime := atomic.LoadInt64(&s.totalTime)

    fmt.Printf("\n=== Statistik ===\n")
    fmt.Printf("Berhasil diproses : %d\n", processed)
    fmt.Printf("Error             : %d\n", errors)
    if processed > 0 {
        avg := time.Duration(totalTime / processed)
        fmt.Printf("Rata-rata waktu   : %v\n", avg)
    }
}

// worker memproses job dari jobCh dan mengirim hasil ke resultCh
func worker(
    ctx context.Context,
    id int,
    jobCh <-chan Job,
    resultCh chan<- Result,
    stats *Stats,
    wg *sync.WaitGroup,
) {
    defer wg.Done()

    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: context dibatalkan, berhenti\n", id)
            return

        case job, ok := <-jobCh:
            if !ok {
                fmt.Printf("Worker %d: jobCh ditutup, selesai\n", id)
                return
            }

            start := time.Now()

            // Simulasi pekerjaan dengan durasi acak
            delay := time.Duration(rand.Intn(100)) * time.Millisecond
            select {
            case <-time.After(delay):
                // pekerjaan selesai
            case <-ctx.Done():
                stats.RecordError()
                return
            }

            // Simulasi error 10% dari waktu
            if rand.Float32() < 0.1 {
                stats.RecordError()
                fmt.Printf("Worker %d: error saat proses job #%d\n", id, job.ID)
                continue
            }

            elapsed := time.Since(start)
            stats.RecordSuccess(elapsed)

            result := Result{
                JobID:  job.ID,
                Output: job.Value * job.Value,
                Worker: id,
            }

            select {
            case resultCh <- result:
            case <-ctx.Done():
                return
            }
        }
    }
}

func main() {
    const (
        numWorkers = 5
        numJobs    = 30
        timeout    = 3 * time.Second
    )

    // Context dengan timeout
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    jobCh := make(chan Job, numJobs)
    resultCh := make(chan Result, numJobs)
    stats := &Stats{}

    // Mulai worker pool
    var wg sync.WaitGroup
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(ctx, i, jobCh, resultCh, stats, &wg)
    }

    // Tutup resultCh setelah semua worker selesai
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    // Kirim semua job
    for i := 1; i <= numJobs; i++ {
        select {
        case jobCh <- Job{ID: i, Value: i}:
        case <-ctx.Done():
            fmt.Println("Timeout saat mengirim job!")
            break
        }
    }
    close(jobCh)

    // Kumpulkan hasil
    fmt.Printf("Memulai %d job dengan %d worker...\n\n", numJobs, numWorkers)
    received := 0
    for result := range resultCh {
        received++
        fmt.Printf("Job #%-3d → %d² = %d (Worker %d)\n",
            result.JobID, result.JobID, result.Output, result.Worker)
    }

    stats.Report()
    fmt.Printf("Total hasil diterima: %d dari %d job\n", received, numJobs)
}

Ringkasan #

  • Goroutine sangat ringan (~2KB stack awal) — bisa jalankan ratusan ribu goroutine dalam satu proses.
  • wg.Add(1) sebelum go, bukan di dalam goroutine — sinyal ke WaitGroup harus diberikan sebelum goroutine berjalan.
  • Unbuffered channel menyebabkan pengirim dan penerima saling menunggu — sinkronisasi ketat.
  • Buffered channel memungkinkan pengirim tidak block selama buffer belum penuh — decoupling produsen dan konsumen.
  • Hanya pengirim yang boleh close channel — menutup dari penerima atau menutup yang sudah ditutup menyebabkan panic.
  • select untuk multiplexing channel; gunakan default untuk non-blocking, time.After untuk timeout.
  • sync.Mutex untuk shared state yang sering ditulis; sync.RWMutex untuk read-heavy workload.
  • sync/atomic lebih ringan dari mutex untuk operasi sederhana pada tipe numerik.
  • Selalu jalankan dengan -race saat development dan testing untuk mendeteksi race condition.
  • Context untuk menyebarkan cancellation dan deadline ke seluruh call chain dan goroutine.
  • Goroutine leak terjadi ketika goroutine tidak pernah berhenti — selalu sediakan mekanisme stop via context atau done channel.

← Sebelumnya: Keyword   Berikutnya: I/O →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact