Goroutines, Channels, dan Select: Mental Model Concurrency Go
Materi mendalam tentang goroutine, channel, select, pipeline, worker pool, backpressure, cancellation, dan failure mode concurrency Go untuk software engineer.
Goroutines, Channels, dan Select: Mental Model Concurrency Go
Target part ini: kamu mampu menulis concurrent Go code yang bukan hanya “jalan”, tetapi punya lifecycle, cancellation, ownership, backpressure, dan failure boundary yang jelas.
Concurrency di Go terlihat sederhana karena sintaksnya kecil: go, chan, select. Justru karena kecil, kesalahan desain sering tidak terlihat di permukaan. Goroutine yang bocor, channel yang tidak pernah ditutup, send yang blocking selamanya, worker pool yang tidak punya shutdown path, atau pipeline yang tidak propagate error adalah bug produksi yang sulit didiagnosis.
Part ini mengikuti pendekatan Kaufman: kita tidak mencoba menghafal semua pattern concurrency. Kita dekomposisi skill menjadi beberapa sub-skill penting: membuat goroutine, berkomunikasi lewat channel, memilih event dengan select, menutup lifecycle, mencegah leak, mengatur backpressure, dan mendesain ownership data.
1. Mental Model Utama
Go mendorong model:
Do not communicate by sharing memory; share memory by communicating.
Kalimat ini bukan aturan absolut. Go tetap menyediakan sync.Mutex, sync.WaitGroup, atomic, dan primitive lain untuk shared memory. Maksudnya: jika alur kerja dapat dimodelkan sebagai aliran data atau event, channel sering membuat ownership lebih jelas daripada shared mutable state.
Namun channel bukan pengganti semua lock. Channel adalah alat koordinasi dan komunikasi. Mutex adalah alat proteksi shared state. Salah memilih alat menghasilkan kode yang lebih kompleks dari masalahnya.
2. Concurrency vs Parallelism
Concurrency dan parallelism sering dicampur.
Concurrency adalah struktur program: banyak pekerjaan dapat berjalan tumpang tindih secara logis.
Parallelism adalah eksekusi fisik: banyak pekerjaan benar-benar dieksekusi bersamaan di core berbeda.
Go membuat concurrency murah lewat goroutine. Apakah goroutine berjalan paralel bergantung pada scheduler, jumlah core, blocking operation, dan nilai GOMAXPROCS.
Contoh concurrency tanpa guarantee parallelism:
package main
import "fmt"
func main() {
go func() {
fmt.Println("work from goroutine")
}()
fmt.Println("work from main")
}
Program ini punya bug subtle: program bisa selesai sebelum goroutine sempat menulis output. Goroutine tidak otomatis membuat program menunggu.
Versi yang benar butuh koordinasi:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("work from goroutine")
}()
fmt.Println("work from main")
wg.Wait()
}
sync.WaitGroup akan dibahas lebih dalam di Part 14. Di part ini, fokusnya adalah model goroutine dan channel.
3. Goroutine: Unit Concurrency Ringan
Goroutine adalah fungsi yang berjalan secara concurrent dengan goroutine lain dalam program yang sama.
go doWork()
Atau anonymous function:
go func() {
// concurrent work
}()
Mental model yang perlu kamu pegang:
- Goroutine murah, tetapi bukan gratis.
- Goroutine punya stack yang dapat tumbuh.
- Goroutine harus punya lifecycle yang jelas.
- Goroutine yang blocked selamanya adalah leak.
- Program tidak menunggu goroutine kecuali kamu menyuruhnya.
4. Bug Pertama: Goroutine Tanpa Lifecycle
Kode berikut sering muncul pada pemula:
func FireAndForget(job Job) {
go func() {
process(job)
}()
}
Masalahnya bukan pada go func. Masalahnya adalah tidak ada jawaban untuk pertanyaan:
- Siapa yang menunggu hasilnya?
- Bagaimana error dikembalikan?
- Bagaimana jika
processhang? - Bagaimana jika request dibatalkan?
- Bagaimana ketika service shutdown?
- Apakah boleh job hilang?
Fire-and-forget jarang benar di backend system. Lebih sering itu berarti “failure-and-forget”.
Versi lebih sehat minimal menerima context.Context dan menyediakan error path:
func ProcessAsync(ctx context.Context, job Job) <-chan error {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
if err := process(ctx, job); err != nil {
errCh <- err
}
}()
return errCh
}
Catatan penting: errCh diberi buffer 1 agar goroutine tidak blocked jika caller tidak segera menerima error. Ini bukan solusi universal, tetapi untuk single result channel, buffer satu sering masuk akal.
5. Channel: Typed Conduit
Channel adalah conduit bertipe untuk mengirim nilai antar goroutine.
ch := make(chan string)
Mengirim:
ch <- "hello"
Menerima:
msg := <-ch
Channel punya tipe:
chan int
chan string
chan Order
chan error
Channel juga bisa diarahkan:
func producer(out chan<- int) {
out <- 1
}
func consumer(in <-chan int) {
value := <-in
_ = value
}
Directional channel membuat kontrak fungsi lebih jelas. Jika fungsi hanya mengirim, gunakan chan<- T. Jika hanya menerima, gunakan <-chan T.
6. Unbuffered Channel
Unbuffered channel dibuat tanpa capacity:
ch := make(chan int)
Send ke unbuffered channel akan block sampai ada receiver. Receive dari unbuffered channel akan block sampai ada sender.
func main() {
ch := make(chan string)
go func() {
ch <- "ready"
}()
msg := <-ch
fmt.Println(msg)
}
Unbuffered channel adalah mekanisme komunikasi sekaligus sinkronisasi. Ketika send dan receive bertemu, ada handoff eksplisit.
Unbuffered channel cocok untuk:
- handoff langsung;
- synchronization point;
- pipeline stage yang ingin backpressure natural;
- event yang tidak boleh ditumpuk sembarangan.
7. Buffered Channel
Buffered channel punya capacity:
ch := make(chan int, 10)
Send akan block hanya ketika buffer penuh. Receive akan block hanya ketika buffer kosong.
ch := make(chan int, 2)
ch <- 1
ch <- 2
// ch <- 3 // akan block jika tidak ada receiver
fmt.Println(<-ch)
fmt.Println(<-ch)
Buffered channel berguna untuk smoothing burst kecil. Tetapi buffer bukan pengganti desain capacity.
Jika kamu menaikkan buffer dari 10 ke 10000 hanya untuk “menghilangkan blocking”, kamu mungkin sedang menyembunyikan masalah backpressure.
8. Channel Sebagai Queue: Hati-hati
Channel sering dipakai sebagai queue. Itu boleh, tetapi jangan lupa:
- channel in-memory hilang saat process mati;
- channel tidak punya persistence;
- channel tidak punya retry durable;
- channel tidak punya visibility timeout;
- channel tidak punya dead-letter queue;
- channel tidak cocok untuk job queue lintas instance.
Channel cocok untuk koordinasi internal process. Untuk distributed queue, gunakan message broker, database-backed queue, atau persistent log sesuai kebutuhan.
9. Closing Channel
Channel dapat ditutup oleh sender:
close(ch)
Receiver dapat mendeteksi channel sudah ditutup:
value, ok := <-ch
if !ok {
// channel closed
}
Atau menggunakan range:
for value := range ch {
fmt.Println(value)
}
Rule penting:
Sender closes the channel. Receiver does not close it.
Mengapa? Karena hanya sender yang tahu kapan tidak ada lagi value yang akan dikirim.
Anti-pattern:
func consume(ch chan int) {
defer close(ch) // buruk: consumer tidak punya ownership untuk close
for v := range ch {
fmt.Println(v)
}
}
Jika banyak sender, close harus dikoordinasikan. Jangan biarkan banyak goroutine berebut close, karena close pada channel yang sudah closed akan panic.
10. Nil Channel
Nil channel akan block selamanya saat send atau receive.
var ch chan int
// ch <- 1 // block forever
// <-ch // block forever
Ini sering bug, tetapi bisa dipakai secara sengaja dalam select untuk enable/disable case.
var out chan<- int
if ready {
out = resultCh
}
select {
case out <- value:
// hanya aktif jika out bukan nil
case <-ctx.Done():
return ctx.Err()
}
Namun pattern nil channel harus digunakan hati-hati karena menambah complexity.
11. Select
select menunggu beberapa operasi channel.
select {
case msg := <-ch1:
fmt.Println("from ch1", msg)
case msg := <-ch2:
fmt.Println("from ch2", msg)
case <-time.After(time.Second):
fmt.Println("timeout")
}
Jika beberapa case siap, Go memilih salah satu secara pseudo-random. Jangan membuat logic yang bergantung pada urutan prioritas case biasa.
12. Select dengan Context
Pattern paling umum di service Go:
select {
case result := <-resultCh:
return result, nil
case <-ctx.Done():
return Result{}, ctx.Err()
}
Ini membuat operasi cancellation-aware.
Contoh worker:
func worker(ctx context.Context, jobs <-chan Job, results chan<- Result) {
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
result := process(job)
select {
case results <- result:
case <-ctx.Done():
return
}
}
}
}
Perhatikan ada dua select:
- saat menerima job;
- saat mengirim result.
Tanpa select kedua, worker bisa blocked saat mengirim result jika receiver sudah berhenti.
13. Timeout: Hindari time.After di Loop Panjang
Untuk operasi sekali pakai, time.After sederhana:
select {
case v := <-ch:
return v, nil
case <-time.After(500 * time.Millisecond):
return 0, errors.New("timeout")
}
Untuk loop panjang, pertimbangkan time.NewTimer atau time.NewTicker agar lifecycle eksplisit.
timer := time.NewTimer(500 * time.Millisecond)
defer timer.Stop()
select {
case v := <-ch:
return v, nil
case <-timer.C:
return 0, errors.New("timeout")
}
Untuk interval berulang:
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
collectMetrics()
}
}
Timer dan ticker harus dihentikan jika tidak dipakai lagi, terutama pada lifecycle panjang.
14. Fan-out dan Fan-in
Fan-out: satu stream pekerjaan dibagi ke banyak worker.
Fan-in: hasil dari banyak worker digabung ke satu stream.
Contoh worker pool sederhana:
type Job struct {
ID int
}
type Result struct {
JobID int
Value string
}
func worker(id int, jobs <-chan Job, results chan<- Result) {
for job := range jobs {
results <- Result{
JobID: job.ID,
Value: fmt.Sprintf("worker-%d processed job-%d", id, job.ID),
}
}
}
func main() {
jobs := make(chan Job)
results := make(chan Result)
workerCount := 3
jobCount := 10
for i := 1; i <= workerCount; i++ {
go worker(i, jobs, results)
}
go func() {
defer close(jobs)
for i := 1; i <= jobCount; i++ {
jobs <- Job{ID: i}
}
}()
for i := 0; i < jobCount; i++ {
fmt.Println(<-results)
}
}
Kode ini jalan, tetapi belum sempurna. Masalahnya:
resultstidak ditutup;- tidak ada cancellation;
- tidak ada error path;
- worker bisa blocked saat mengirim result jika consumer berhenti;
- tidak ada shutdown coordination.
Versi produksi butuh desain lifecycle lebih eksplisit.
15. Worker Pool dengan Context dan Error
Contoh lebih realistis:
type Job struct {
ID int
}
type Result struct {
JobID int
Value string
}
func process(ctx context.Context, job Job) (Result, error) {
select {
case <-ctx.Done():
return Result{}, ctx.Err()
default:
}
return Result{
JobID: job.ID,
Value: fmt.Sprintf("processed-%d", job.ID),
}, nil
}
func worker(
ctx context.Context,
jobs <-chan Job,
results chan<- Result,
errs chan<- error,
) {
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
result, err := process(ctx, job)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}
return
}
select {
case results <- result:
case <-ctx.Done():
return
}
}
}
}
Untuk production code, kamu biasanya akan memakai errgroup dari golang.org/x/sync/errgroup agar cancellation dan error propagation lebih rapi. Tetapi penting memahami primitive dasarnya dulu.
16. Pipeline
Pipeline adalah rangkaian stage. Setiap stage menerima stream input, memproses, lalu mengirim stream output.
Contoh sederhana:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
for n := range square(gen(1, 2, 3)) {
fmt.Println(n)
}
}
Ini contoh bagus untuk konsep, tetapi belum cancellation-aware.
17. Pipeline dengan Cancellation
Masalah pipeline klasik: downstream berhenti membaca sebelum upstream selesai mengirim. Upstream akan blocked, lalu goroutine leak.
Versi lebih aman:
func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case n, ok := <-in:
if !ok {
return
}
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}
}()
return out
}
Consumer:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out := square(ctx, gen(ctx, 1, 2, 3, 4, 5))
fmt.Println(<-out)
cancel() // hentikan upstream karena kita hanya butuh satu hasil
}
18. Backpressure
Backpressure adalah kemampuan sistem untuk memberi sinyal bahwa downstream tidak sanggup menerima data lebih cepat.
Tanpa backpressure:
producer speed > consumer speed -> queue grows -> memory grows -> latency grows -> crash
Channel unbuffered memberi backpressure natural: producer tidak bisa send jika consumer belum siap.
Buffered channel memberi elastisitas terbatas: producer boleh sedikit lebih cepat, tetapi tetap blocked saat buffer penuh.
Unbounded queue adalah bahaya jika tidak ada limit. Di Go, channel selalu bounded jika buffered. Unbuffered berarti capacity 0. Ini bagus karena memaksa kamu memikirkan kapasitas.
Desain capacity harus menjawab:
- berapa worker count;
- berapa maksimal queued work;
- apa yang terjadi saat queue penuh;
- apakah request ditolak, diblokir, atau dijatuhkan;
- apakah caller mendapat sinyal overload;
- apakah ada metrics queue length;
- apakah ada timeout saat enqueue.
Contoh enqueue dengan timeout:
func Enqueue(ctx context.Context, jobs chan<- Job, job Job) error {
select {
case jobs <- job:
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
return errors.New("job queue is full")
}
}
Dalam code produksi, lebih baik timeout berasal dari context caller, bukan time.After hard-coded.
19. Goroutine Leak
Goroutine leak terjadi ketika goroutine tidak pernah selesai padahal tidak lagi berguna.
Contoh leak:
func leaky() <-chan int {
ch := make(chan int)
go func() {
ch <- 42 // blocked forever jika tidak ada receiver
}()
return ch
}
func main() {
_ = leaky()
}
Goroutine di dalam leaky blocked saat mengirim ke channel yang tidak pernah dibaca.
Versi lebih aman:
func notLeaky(ctx context.Context) <-chan int {
ch := make(chan int, 1)
go func() {
defer close(ch)
select {
case ch <- 42:
case <-ctx.Done():
}
}()
return ch
}
Buffer 1 dapat membantu untuk single result, tetapi context tetap penting untuk cancellation.
20. Common Leak Patterns
20.1 Sender Blocked
results <- result
Jika receiver berhenti, sender blocked.
Solusi:
select {
case results <- result:
case <-ctx.Done():
return
}
20.2 Receiver Blocked
job := <-jobs
Jika tidak ada sender dan channel tidak pernah ditutup, receiver blocked.
Solusi:
select {
case job, ok := <-jobs:
if !ok {
return
}
_ = job
case <-ctx.Done():
return
}
20.3 Ticker Tidak Dihentikan
ticker := time.NewTicker(time.Second)
for range ticker.C {
// work
}
Solusi:
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// work
case <-ctx.Done():
return
}
}
20.4 Background Goroutine Tanpa Shutdown
go refreshCacheForever()
Solusi:
go refreshCache(ctx)
Pastikan refreshCache keluar ketika ctx.Done() closed.
21. Ownership Data
Dalam concurrent code, pertanyaan terpenting bukan “pakai channel atau mutex?”, tetapi:
Who owns this data at this moment?
Jika data dikirim lewat channel dan tidak lagi dimutasi oleh sender, ownership berpindah ke receiver.
Baik:
func producer(out chan<- []byte) {
buf := []byte("hello")
out <- buf
// jangan mutasi buf setelah dikirim jika receiver menganggap owner
}
Buruk:
func producer(out chan<- []byte) {
buf := make([]byte, 0, 1024)
buf = append(buf, "hello"...)
out <- buf
buf[0] = 'H' // race atau semantic bug jika receiver sedang baca
}
Jika kamu mengirim pointer atau slice, kamu mengirim referensi ke data. Channel tidak otomatis melakukan deep copy.
22. Channel of Struct vs Pointer
Kirim value jika:
- struct kecil;
- immutable setelah dibuat;
- ingin menghindari shared mutation;
- copy cost rendah.
Kirim pointer jika:
- object besar;
- memang ada owner tunggal yang jelas;
- mutation dilakukan oleh satu pihak;
- identity penting.
Contoh value yang aman:
type Event struct {
ID string
Amount int64
}
ch := make(chan Event)
Contoh pointer dengan ownership eksplisit:
type Buffer struct {
Data []byte
}
ch := make(chan *Buffer)
Jika pointer dikirim ke banyak consumer, kamu harus punya aturan ownership yang ketat.
23. Error Propagation di Concurrent Code
Kesalahan umum: error terjadi di goroutine, tetapi tidak pernah sampai ke caller.
Buruk:
go func() {
if err := doWork(); err != nil {
log.Println(err)
}
}()
Kode ini hanya log error. Caller tidak tahu operasi gagal.
Lebih baik:
func Do(ctx context.Context) error {
errCh := make(chan error, 1)
go func() {
errCh <- doWork(ctx)
}()
select {
case err := <-errCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}
Namun hati-hati: jika doWork tidak menghormati context, goroutine tetap bisa lanjut setelah caller return. Cancellation harus dipropagasi sampai dependency terdalam.
24. First Error Wins
Dalam banyak concurrent workflow, jika satu worker gagal, kamu ingin membatalkan sisanya.
Pattern manual:
func RunAll(ctx context.Context, funcs ...func(context.Context) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errCh := make(chan error, len(funcs))
for _, fn := range funcs {
fn := fn
go func() {
errCh <- fn(ctx)
}()
}
for range funcs {
if err := <-errCh; err != nil {
cancel()
return err
}
}
return nil
}
Ini masih punya caveat: goroutine lain mungkin masih berjalan ketika function return, meskipun context sudah canceled. Untuk produksi, tunggu semua selesai atau gunakan errgroup.
25. Loop Variable Capture
Di Go modern, range variable semantics sudah lebih aman dibanding Go lama. Namun kamu tetap perlu paham closure capture agar tidak membuat bug saat menggunakan variable dari outer scope.
Safe explicit capture tetap sering dipakai karena memperjelas intent:
for _, job := range jobs {
job := job
go func() {
process(job)
}()
}
Untuk loop index biasa, tetap berhati-hati:
for i := 0; i < 10; i++ {
i := i
go func() {
fmt.Println(i)
}()
}
Prinsipnya: goroutine bisa berjalan setelah iterasi berubah. Capture value yang benar jika value tersebut penting.
26. Select Default
default membuat select non-blocking.
select {
case msg := <-ch:
fmt.Println(msg)
default:
fmt.Println("no message")
}
Ini berguna, tetapi bisa menyebabkan busy loop:
for {
select {
case msg := <-ch:
handle(msg)
default:
// CPU spin jika tidak ada sleep atau blocking work
}
}
Jika kamu butuh loop periodik, gunakan ticker. Jika kamu butuh menunggu event, jangan pakai default.
27. Channel Close Broadcast
Closed channel dapat dipakai sebagai broadcast signal karena receive dari closed channel langsung siap.
done := make(chan struct{})
for i := 0; i < 3; i++ {
go func(id int) {
<-done
fmt.Println("stopped", id)
}(i)
}
close(done)
Namun untuk cancellation modern, lebih umum gunakan context.Context.
chan struct{} tetap berguna untuk internal signal sederhana, terutama ketika tidak perlu deadline/value.
28. Semaphore dengan Buffered Channel
Buffered channel dapat dipakai sebagai semaphore untuk membatasi concurrency.
func ProcessAll(ctx context.Context, items []Item, limit int) error {
sem := make(chan struct{}, limit)
errCh := make(chan error, len(items))
for _, item := range items {
item := item
select {
case sem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
go func() {
defer func() { <-sem }()
errCh <- process(ctx, item)
}()
}
for range items {
if err := <-errCh; err != nil {
return err
}
}
return nil
}
Caveat:
- error pertama tidak membatalkan pekerjaan lain;
- function return saat error bisa meninggalkan goroutine masih berjalan;
- lebih baik gunakan context cancellation dan wait group/errgroup untuk produksi.
29. Designing Concurrent APIs
API concurrent yang baik menjawab:
- Siapa membuat goroutine?
- Siapa menghentikan goroutine?
- Siapa menutup channel?
- Apakah caller wajib membaca channel sampai habis?
- Bagaimana error dikembalikan?
- Bagaimana cancellation bekerja?
- Apakah function return sebelum semua goroutine selesai?
- Apakah ada limit concurrency?
- Apakah ada backpressure?
- Apakah data yang dikirim immutable atau owned?
API buruk:
func Watch() <-chan Event
Pertanyaan yang tidak terjawab:
- kapan berhenti?
- bagaimana error?
- siapa cleanup resource?
API lebih baik:
func Watch(ctx context.Context) (<-chan Event, <-chan error)
Atau callback style:
func Watch(ctx context.Context, handle func(Event) error) error
Callback style sering lebih mudah mengelola lifecycle karena error dan cancellation tetap berada dalam call stack.
30. Callback vs Channel API
Channel API cocok jika:
- caller memang ingin stream;
- caller butuh compose pipeline;
- lifecycle jelas dengan context;
- error path jelas.
Callback API cocok jika:
- library ingin mengontrol lifecycle;
- error harus langsung memutus loop;
- resource cleanup penting;
- tidak ingin memaksa caller drain channel.
Channel API:
for event := range client.Events(ctx) {
handle(event)
}
Callback API:
err := client.Watch(ctx, func(event Event) error {
return handle(event)
})
Tidak ada pemenang absolut. Pilih berdasarkan lifecycle dan failure semantics.
31. Production Example: Bounded Email Sender
Kita buat contoh bounded worker pool. Requirement:
- menerima email job;
- maksimal N worker;
- queue bounded;
- enqueue menghormati context;
- shutdown menunggu worker selesai;
- error per job dikembalikan lewat result;
- tidak ada goroutine leak.
type EmailJob struct {
ID string
To string
Subject string
Body string
}
type EmailResult struct {
JobID string
Err error
}
type EmailSender interface {
Send(ctx context.Context, job EmailJob) error
}
type Pool struct {
sender EmailSender
jobs chan EmailJob
results chan EmailResult
}
func NewPool(sender EmailSender, queueSize int) *Pool {
return &Pool{
sender: sender,
jobs: make(chan EmailJob, queueSize),
results: make(chan EmailResult, queueSize),
}
}
func (p *Pool) Start(ctx context.Context, workers int) {
for i := 0; i < workers; i++ {
go p.worker(ctx)
}
}
func (p *Pool) Submit(ctx context.Context, job EmailJob) error {
select {
case p.jobs <- job:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (p *Pool) Results() <-chan EmailResult {
return p.results
}
func (p *Pool) Stop() {
close(p.jobs)
}
func (p *Pool) worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case job, ok := <-p.jobs:
if !ok {
return
}
err := p.sender.Send(ctx, job)
result := EmailResult{JobID: job.ID, Err: err}
select {
case p.results <- result:
case <-ctx.Done():
return
}
}
}
}
Kode ini masih punya kelemahan: results tidak ditutup karena Pool belum melacak semua worker selesai. Itu akan diperbaiki dengan sync.WaitGroup di Part 14.
Pelajaran penting: channel saja tidak cukup untuk lifecycle kompleks. Kamu sering butuh kombinasi channel, context, dan sync primitive.
32. Anti-pattern: Channel untuk Getter/Setter
Buruk:
type Counter struct {
commands chan command
}
Lalu semua operasi sederhana dikirim ke goroutine owner. Ini bisa benar untuk actor-like state machine, tetapi overkill untuk counter sederhana.
Lebih sederhana:
type Counter struct {
mu sync.Mutex
n int
}
Gunakan goroutine owner jika:
- state punya event loop natural;
- operasi harus serialized;
- ada timer, I/O, atau lifecycle kompleks;
- kamu ingin menghindari banyak lock di state machine.
Gunakan mutex jika:
- state kecil;
- critical section jelas;
- tidak ada event loop natural;
- operasi cepat dan synchronous.
33. Anti-pattern: Exposing Bidirectional Channel
Buruk:
func Events() chan Event {
return events
}
Caller bisa mengirim atau menutup channel. Itu merusak ownership.
Lebih baik:
func Events() <-chan Event {
return events
}
Untuk input:
func Submit(in chan<- Job, job Job) {
in <- job
}
Directional channel adalah bentuk kecil dari access control.
34. Anti-pattern: Closing Receive-only Channel
Jika function menerima <-chan T, compiler mencegah close:
func consume(in <-chan int) {
// close(in) // compile error
}
Ini bagus. Gunakan directional channel untuk membuat ownership close mustahil dilanggar oleh caller.
35. Anti-pattern: Menggunakan Channel untuk Error Tunggal Tanpa Buffer
Buruk:
func doAsync() <-chan error {
errCh := make(chan error)
go func() {
errCh <- doWork()
}()
return errCh
}
Jika caller tidak receive, goroutine blocked.
Lebih aman:
func doAsync() <-chan error {
errCh := make(chan error, 1)
go func() {
errCh <- doWork()
close(errCh)
}()
return errCh
}
Tetap tidak sempurna jika doWork butuh cancellation. Tambahkan context jika operasinya bisa lama.
36. Checklist Review Concurrency Go
Gunakan checklist ini saat review:
| Area | Pertanyaan |
|---|---|
| Lifecycle | Apakah setiap goroutine punya exit path? |
| Cancellation | Apakah context.Context dipropagasi ke operasi blocking? |
| Channel ownership | Siapa yang menutup channel? |
| Send blocking | Apakah send bisa blocked selamanya? |
| Receive blocking | Apakah receive bisa blocked selamanya? |
| Error path | Apakah error dari goroutine sampai ke caller? |
| Backpressure | Apakah queue/channel punya capacity dan overload behavior jelas? |
| Data ownership | Apakah pointer/slice yang dikirim masih dimutasi sender? |
| Shutdown | Apakah service bisa berhenti tanpa leak? |
| Observability | Apakah ada metrics untuk queue, worker, error, latency? |
37. Latihan Terarah
Latihan 1 — Single Result Channel
Buat function:
func FetchAsync(ctx context.Context, id string) <-chan Result
Constraint:
- goroutine tidak boleh leak;
- result channel harus closed;
- jika context canceled, worker harus berhenti;
- result channel buffer
1.
Latihan 2 — Worker Pool
Buat worker pool untuk memproses []Job.
Constraint:
- worker count configurable;
- input channel closed oleh producer;
- result channel closed setelah semua worker selesai;
- error path tersedia;
- tidak ada goroutine leak jika context canceled.
Latihan 3 — Pipeline dengan Early Stop
Buat pipeline:
generate -> parse -> validate -> sink
Consumer berhenti setelah menemukan satu item valid. Pastikan upstream berhenti juga.
Latihan 4 — Backpressure
Buat queue bounded dengan capacity 10. Jika queue penuh, Submit harus return error saat context timeout.
38. Rubrik Self-assessment
Kamu siap lanjut ke Part 14 jika bisa menjawab tanpa melihat catatan:
- Apa beda unbuffered dan buffered channel?
- Siapa yang harus close channel?
- Mengapa send ke channel bisa menyebabkan goroutine leak?
- Mengapa receive dari channel bisa blocked selamanya?
- Kapan memakai
selectdenganctx.Done()? - Apa itu fan-out dan fan-in?
- Apa itu backpressure?
- Mengapa channel bukan durable queue?
- Kapan channel lebih tepat daripada mutex?
- Kapan mutex lebih tepat daripada channel?
39. Ringkasan
Concurrency Go kuat karena primitive-nya kecil dan composable. Tetapi kekuatan itu baru aman jika kamu mendesain lifecycle secara eksplisit.
Pegang prinsip berikut:
Every goroutine must have a reason to start and a path to stop.
Channel bukan sekadar tempat lewat data. Channel adalah kontrak komunikasi, sinkronisasi, ownership, dan backpressure. Jika kontrak itu tidak jelas, bug-nya biasanya muncul bukan saat unit test, tetapi saat production overload, shutdown, timeout, atau partial failure.
Di Part 14, kita akan masuk ke primitive sync: Mutex, RWMutex, WaitGroup, Once, Cond, Map, dan atomic. Tujuannya bukan mengganti channel, tetapi memahami kapan shared memory lebih sederhana, lebih jelas, dan lebih benar.
You just completed lesson 13 in build core. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.
Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.