Menggunakan Fitur Concurrency di Go

23 August 2023
ยท
14 min read
Menggunakan Fitur Concurrency di Go

Intro

Salah satu nilai jual dari bahasa Go adalah fitur concurrency, fitur ini sering menjadi alasan utama kebanyakan orang memilih Go. Diartikel ini kita akan membahas tool yang tersedia di Go untuk membuat kode yang concurrent, diantara adalah sebagai berikut.

  • Goroutine
  • Channel
  • Select
  • Package Sync

Concurrency Vs Paralelisme

Go mempunyai fitur concurrency secara native, tetapi terkadang ada kesalah-pahaman bahwa concurrency dan paralelisme adalah kedua hal yang sama, padahal kedua istilah ini memiliki makna yang berbeda.

concurrency adalah tentang bagaimana mengatur dan berurusan dengan banyak pekerjaan dalam satu waktu. Apa maksudnya ?

Pada dasarnya 1 CPU hanya bisa melakukan 1 pekerjaan dalam 1 waktu, sebagai contoh jika kita membuka banyak program sekaligus diperangkat yang memiliki 1 CPU, secara sederhana CPU tersebut akan berpindah-pindah (context switch) dengan cepat antara program 1 dengan program yang lain. hal ini menciptkan sebuah ilusi seolah-olah 1 CPU tersebut melakukan banyak pekerjaan secara sekaligus, padahal kenyataanya tidak. inilah yang disebut concurrency.

Concurrency

Dilain itu, paralel adalah beberapa pekerjaan dikerjakan secara bersamaan, ini mengharuskan perangkat yang digunakan mempunyai lebih dari 1 CPU.

Paralel

Kode yang didesain concurrent bisa saja berjalan secara paralel (dilingkungan multi CPU), tetapi hal ini bukanlah keharusan.

Di Go, kita bisa menggunakan Goroutine untuk membuat program yang bersifat concurrent, fitur tersebut diharapkan membuat program yang kita buat menjadi lebih efektif, baik dilingkungan single CPU ataupun multi CPU. Tetapi yang perlu diingat adalah, menggunakan fitur concurrency bukan berarti program yang kita buat otomatis menjadi lebih cepat, ini sangat bergantung dari algoritma yang dibuat dan jumlah core yang dimiliki.

Apa itu Goroutine ?

Dibeberapa bahasa pemrograman kita bisa menggunakan OS threads (thread pada level sistem operasi) secara langsung untuk melakukan pekerjaan secara paralel, tetapi hal tersebut tidak terlalu efisien dari sisi performa dan memori karena perlunya berkomunikasi dengan resource dilevel sistem operasi. hal ini menjadi salah satu alasan utama dibuatnya fitur concurrency secara native didalam bahasa Go.

Goroutine adalah sebuah function yang dieksekusi secara independen dan dapat berjalan secara bersamaan (concurrent) dengan function lainnya. Goroutine ini mirip thread, bedanya goroutine dimanage oleh Go runtime, sedangkan thread dimanage oleh sistem operasi, hal ini yang membuat goroutine menjadi lebih ringan dan efisien. 1 Goroutine hanya memakan memori beberapa kilobyte saja, secara teknis membuat puluhan ribu goroutine bukanlah sebuah masalah.

Cara Menggunakan Goroutine

Untuk menjalankan sebuah Goroutine kita bisa menggunakan keyword Go sebelum menjalankan sebuah function.

go doSomething()

ketika kita menggunakan keyword go tersebut, function tersebut akan berjalan dibackground secara concurrent, yang artinya kita tidak perlu menunggu function tersebut selesai untuk melanjutkan eksekusi berikutnya.

berikut contoh kodenya secara lengkap.

package main
 
import (
	"fmt"
	"time"
)
 
func main() {
	// jalankan 2 goroutine secara concurrent
	go count("A")
	go count("B")
 
	fmt.Println("Main goroutine finished")
}
 
func count(s string) {
	for i := 1; i <= 3; i++ {
		time.Sleep(1 * time.Second)
		fmt.Println(s, ": ", i)
	}
}

Output

Main goroutine finished

dilihat dari output diatas, bisa diketahui bahwa program yang baru saja kita jalankan langsung exit begitu saja, padahal kita menjalankan 2 function count dengan goroutine yang membutuhkan waktu sekitar 3 detik, hal ini karena jika goroutine utama berakhir (function main), maka program akan keluar dan goroutine yang berjalan dibackground akan terhenti. mari kita ubah kode diatas menjadi sebagai berikut

package main
 
import (
	"fmt"
	"time"
)
 
func main() {
	// Starting two goroutines
	go count("A")
	go count("B")
 
	// Wait for goroutines to finish before main goroutine ends
	time.Sleep(time.Second * 4)
	fmt.Println("Main goroutine finished")
}
 
func count(s string) {
	for i := 1; i <= 3; i++ {
		time.Sleep(1 * time.Second)
		fmt.Println(s, ": ", i)
	}
}

Output

A :  1
B :  1
A :  2
B :  2
B :  3
A :  3
Main goroutine finished

setelah kita tambahkan fungsi sleep di main goroutine selama 4 detik, dari situ kita bisa melihat dua function count tadi dapat berjalan. dari outputnya kita juga bisa melihat output dari masing-masing function saling tumpang tindih, hal ini menandakan kedua function berjalan secara concurrent.

Berkomunikasi menggunakan Channel

ketika kita menggunakan goroutine, kita tidak bisa mengakses return value dari function tersebut. namun, ada saatnya kita membutuhkan 2 atau lebih goroutine untuk berkomunikasi/bertukar data. disituasi tersebut, kita bisa menggunakan channel sebagai media komunikasi.

untuk membuat channel kita dapat menggunakan function make() seperti berikut

ch := make(chan int) //membuat channel dengan tipe data integer

untuk mengirim data ke channel kita bisa menggunakan code seperti berikut

ch <- b   // kirim data dari variabel b ke channel ch

dan untuk menerima data kita seperti berikut

a := <-ch // terima data dari channel ch kemudian assign ke variable a

berikut contoh kodenya secara lengkap

package main
 
import (
	"fmt"
)
 
func main() {
	// buat channel bertipe string
	msgChannel := make(chan string)
	// eksekusi function dengan goroutine
	go channelData(msgChannel)
	// baris ini akan mengeblok eksekusi, 
	// sampai ada message yang diterima
	fmt.Println("msgChannel Data:", <-msgChannel)
 
}
 
func channelData(msg chan string) {
  //line ini akan mengeblok eksekusi sampai ada penerima yang mengambil message tersebut
	msg <- "Belajar Go"
}

Output

Belajar Go

Pada kode diatas operasi menerima pesan <-msgChannel akan bersifat blocking, sampai ada pesan yang diterima barulah eksekusi akan berlanjut, hal tersebut hanya berlaku diunbuffered channel yang akan kita bahas dibawah.

For-range dan channel

untuk menerima message dari channel kita juga bisa menggunakan keyword for, terutama jika message yang dikirim berulang-kali dalam 1 channel

for v := range ch {
    fmt.Println(v)
}

berikut contoh lengkap penggunaan for dengan channel

package main
 
import (
	"fmt"
	"time"
)
 
func produce(ch chan int) {
	for i := 0; i < 5; i++ {
		ch <- i
		time.Sleep(time.Second)
	}
	close(ch)
}
 
func main() {
	ch := make(chan int)
 
	go produce(ch)
 
	for val := range ch {
		fmt.Println(val)
	}
}
 

Output

0
1
2
3
4

jika kita melihat kode diatas, kita bisa menggunakan kode for val := range ch untuk menerima message dari channel. Dalam setiap iterasi, variable val akan menerima nilai berikutnya dari message yang dikirim channel ch. iterasi akan terhenti jika channel ch tersebut ditutup dengan function close(ch)

Closing Channel

Setelah kita selesai mengirim data ke sebuah channel, kita dapat menutup channel tersebut menggunakan function close(ch). Penutupan ini mengindikasikan bahwa tidak ada lagi value yang akan dikirim ke channel tersebut. Jika kita mencoba mengirim data ke channel yang telah ditutup, hal tersebut akan menimbulkan panic.

ketika suatu channel ditutup, channel tersebut juga akan memberikan pesan ke penerima, kita bisa mengaksesnya seperti berikut

v, ok := <-ch

ketika variabel ok bernilai false, kita bisa menilai bahwa channel itu sudah ditutup, dan value yang diberikan hanyalah default value saja (zero value) yang biasanya tidak kita gunakan.

hal yang perlu diingat adalah hanya pengirim pesan pada channel (writer) yang mempunyai tanggung jawab untuk menutup channel dan kita hanya perlu menutup channel jika ada goroutine yang perlu tahu kapan suatu channel ditutup (semisal for-range).

Unbuffered dan Buffered Channel

Dibahasa Go ada 2 tipe channel yang tersedia

  1. Unbuffered Channel
  2. Buffered Channel

Secara default, saat kita membuat channel, tipe channel yang dihasilkan adalah Unbuffered. Apa artinya? Unbuffered channel merupakan channel tanpa kapasitas penyimpanan (buffer) untuk menyimpan pesan. Ini berarti setiap operasi pengiriman (send) akan menunggu hingga ada penerima (receive) yang tersedia. Jika tidak ada penerima yang siap, maka operasi pengiriman akan blocking. Perhatikan kode berikut.

package main
 
import "fmt"
 
func main() {
	//buat unbuffered channel bertipe data string
	ch := make(chan string)
	//kirim message ke channel ch
	ch <- "Learning Go channel"
	//terima message dari channel ch dan print hasilnya
	fmt.Println(<-ch)
 
}
 

Output

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /home/ibnu/personal/playground-go/main.go:9 +0x37
exit status 2

pada line ke 9 kita mengirim pesan ke unbuffered channel. Karena channel tersebut tidak mempunya kapasitas penyimpanan, operasi pengiriman tersebut akan menunggu sampai ada penerima yang siap menerima, namun karena operasi pengiriman tadi terblock, operasi penerimaan di line 11 juga tidak akan dieksekusi, inilah yang menyebabkan deadlock.

mari kita coba ubah channel tersebut menjadi buffered channel dengan kapasitas 1

package main
 
import "fmt"
 
func main() {
	//buat buffered channel bertipe data string dengan kapasitar penyimpanan 1
	ch := make(chan string, 1)
	//kirim message ke channel ch
	ch <- "Learning Go channel"
	//terima message dari channel ch dan print hasilnya
	fmt.Println(<-ch)
 
}

Output

Learning Go channel

dari output diatas bisa kita melihat bahwa buffered channel tidak blocking selama kapasitas penyimpanan masih tersedia.

mari kita coba kirim pesan lebih dari 1 kali seperti berikut

package main
 
import "fmt"
 
func main() {
	//buat buffered channel bertipe data string dengan kapasitar penyimpanan 1
	ch := make(chan string, 1)
	//non blocking karena kapasitas channel masih tersedia
	ch <- "Message 1"
	//blocking karena channel penuh
	ch <- "Message 2"
	//terima message dari channel ch dan print hasilnya
	fmt.Println(<-ch)
 
}

Output

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /home/ibnu/personal/playground-go/main.go:10 +0x4b
exit status 2

kode tersebut menjadi blocking/deadlock kembali karena pada line 11 kita mengirim pesan lagi yang membuat kapasitas channel penuh. Hal ini juga dikarenakan message pertama yang dikirim dari channel tersebut juga masih tersimpan.

Select

Ada kalanya ketika kita membuat program dengan Go, kita menggunakan banyak channel untuk berkomunikasi dengan banyak goroutine.

package main
 
import (
	"fmt"
	"time"
)
 
func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)
 
	go func() {
		time.Sleep(3 * time.Second)
		ch1 <- "gorutine 1 selesai dalam waktu 3 detik"
	}()
 
	go func() {
		time.Sleep(1 * time.Second)
		ch2 <- "gorutine 2 selesai dalam waktu 1 detik"
	}()
 
	fmt.Println(<-ch1)
	fmt.Println(<-ch2)
}

Output

gorutine 1 selesai dalam waktu 3 detik
gorutine 2 selesai dalam waktu 1 detik

hal yang kurang sesuai dari program diatas adalah meskipun goroutine 2 selesai lebih awal kita tidak bisa menggunakan pesan dari goroutine 2 sebelum goroutine 1 selesai, hal ini bisa menjadi bottleneck yang cukup mengganggu, dari masalah tersebut kita bisa menggunakan select keyword.

package main
 
import (
	"fmt"
	"time"
)
 
func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)
 
	go func() {
		time.Sleep(3 * time.Second)
		ch1 <- "gorutine 1 selesai dalam waktu 3 detik"
	}()
 
	go func() {
		time.Sleep(1 * time.Second)
		ch2 <- "gorutine 2 selesai dalam waktu 1 detik"
	}()
 
	select {
	case msg1 := <-ch1:
		fmt.Println(msg1)
	case msg2 := <-ch2:
		fmt.Println(msg2)
	}
}

Output

gorutine 2 selesai dalam waktu 1 detik

dari contoh diatas kita bisa menggunakan keyword select untuk menunggu pesan dari beberapa channel kemudian bereaksi terhadap channel yang pertama kali mengirim pesan.

Agar dapat memproses pesan dari channel selanjutnya kita juga bisa bisa menggunakan for-loop seperti berikut

package main
 
import (
	"fmt"
	"time"
)
 
func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)
 
	go func() {
		time.Sleep(3 * time.Second)
		ch1 <- "gorutine 1 selesai dalam waktu 3 detik"
	}()
 
	go func() {
		time.Sleep(1 * time.Second)
		ch2 <- "gorutine 2 selesai dalam waktu 1 detik"
	}()
 
  //loop 2 kali karena hanya ada 2 pesan yang diterima
	for i := 0; i < 2; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		}
	}
}
 

Output

gorutine 2 selesai dalam waktu 1 detik
gorutine 1 selesai dalam waktu 3 detik

layaknya switch kita juga bisa menggunakan default case jika tidak ada channel yang mengirim message dalam suatu waktu

package main
 
import (
	"fmt"
	"time"
)
 
func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)
 
	go func() {
		time.Sleep(3 * time.Second)
		ch1 <- "selesai dalam waktu 3 detik"
	}()
 
	go func() {
		time.Sleep(1 * time.Second)
		ch2 <- "selesai dalam waktu 1 detik"
	}()
 
	select {
	case msg1 := <-ch1:
		fmt.Println(msg1)
	case msg2 := <-ch2:
		fmt.Println(msg2)
	default:
		fmt.Println("tidak ada channel yang siap mengirim data")
	}
 
	fmt.Println("program selesai")
}

Output

tidak ada channel yang siap mengirim data
program selesai

Sync package

Selain menggunakan channel dan select untuk sinkronasi di antara goroutine, Go juga menyediakan metode sinkronasi lain yang lebih primitive melalui package sync.

Dilain itu yang perlu diperhatikan adalah salah satu pepatah dalam Go Proverb "Do not communicate by sharing memory. instead, share memory by communicating". Pepatah ini menunjukan bahwa sinkronasi antar goroutine sebaiknya dilakukan melalui channel. Namun, dalam beberapa situasi, menggunakan package sync bisa saja menjadi pilihan yang lebih tepat karena lebih simple dan performa yang lebih optimal.

Package sync ini mempunyai beberapa fungsi, namun kali ini kita akan membahasan hanya 2 fungsi saja yaitu sync.WaitGroup dan sync.Mutex

sync.WaitGroup

sync.WaitGroup ini sangat cocok ketika kita ingin menunggu beberapa goroutine sekaligus sampai selesai. Berikut contohnya

package main
 
import (
	"fmt"
	"sync"
	"time"
)
 
func doSomeWork(id int) {
 
	time.Sleep(time.Second)
 
	fmt.Println("done", id)
}
 
func main() {
 
	// buat sync.WaitGroup bernama wg
	var wg sync.WaitGroup
 
	// launch 5 goroutine
	const numWorkers = 5
	for i := 0; i < numWorkers; i++ {
		// Increment wg counter setiap menambah 1 goroutine
		wg.Add(1)
		go func(id int) {
			doSomeWork(id)
			// setelah pekerjaan selesai decrement wg counter
			wg.Done()
		}(i)
	}
 
	// line ini akan blocking sampai wg counter bernilai 0
	wg.Wait()
 
	fmt.Println("All workers completed.")
}

Output

done 1
done 0
done 4
done 3
done 2
All goroutine completed.

Pada dasarnya sync.WaitGroup adalah sebuah counter, ketika kita menjalankan sebuah gorutine kita mengeksekusi wg.Add(1) untuk increment counter dan wg.Done() untuk decrement counter setiap pekerjaan selesai. kemudian kita mengeksekusi wg.Wait() yang akan blocking sampai counter bernilai 0 yang menandakan semua pekerjaan telah selesai dan eksekusi bisa dilanjutkan.

sync.Mutex

Ketika kita memiliki sebuah data yang bisa diakses dan dimodikasi oleh banyak goroutine secara bersamaan, hal ini dapat memunculkan masalah. perhatikan kode dibawah

package main
 
import (
	"fmt"
	"sync"
)
 
type UnSafeCounter struct {
	v int
 
}
 
func (c *UnSafeCounter) Increment() {
	c.v++
}
 
func (c *UnSafeCounter) Value() int {
	return c.v
}
 
func main() {
	var c UnSafeCounter
	var wg sync.WaitGroup
 
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Increment()
			wg.Done()
		}()
	}
 
	wg.Wait()
	fmt.Println(c.Value())
}

Output

995

Dari contoh di atas, meskipun kita mengincrement counter sebanyak 1000 kali, output yang dihasilkan bisa jadi tidak 1000. Kasus ini biasa disebut "race condition"

Ketika banyak goroutine mencoba mengakses dan memodifikasi suatu variabel secara bersamaan tanpa koordinasi yang tepat, mereka bisa "menimpa" perubahan yang dilakukan oleh goroutine lain.

Misalnya, jika dua goroutine membaca nilai v pada saat yang sama (misalnya v adalah 5). dua goroutin tadi akan menghasilkan nilai increment akhir yaitu 6, yang semestinya bernilai 7.

Dari kasus diatas maka diperlukanlah mekanisme locking, dimana dalam 1 waktu hanya 1 gorutine yang boleh melakukan increment, di Go kita bisa menggunakan sync.Mutex. Mari kita coba ubah kode counter tadi menjadi concurrent safe

package main
 
import (
	"fmt"
	"sync"
)
 
type SafeCounter struct {
	v   int
	mux sync.Mutex //tambahkan 1 property sync.Mutex
}
 
func (c *SafeCounter) Increment() {
	//kunci counter agar hanya 1 goroutine yang dapat mengakses dalam 1 waktu 
	c.mux.Lock() 
	c.v++
	//buka kunci setelah proses selesai
	c.mux.Unlock()
}
 
func (c *SafeCounter) Value() int {
	c.mux.Lock()
	defer c.mux.Unlock()
	return c.v
}
 
func main() {
	var c SafeCounter
	var wg sync.WaitGroup
 
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Increment()
			wg.Done()
		}()
	}
 
	wg.Wait()
	fmt.Println(c.Value())
}

Output

1000

setelah kita menambah sync.Mutex ke counter tersebut dan menggunakan Lock() dan Unlock() sebelum mengakses dan merubah data, kita bisa memastikan data counter yang dihasilkan benar dan selalu konsisten meskipun ada 1000 goroutine yang melakukan increment secara concurrent.

Penutup

Setelah memahami dasar-dasar concurrency di Go yaitu goroutine, channel, select, dan sync package, diharapkan kita dapat membuat program yang lebih efektif dilingkungan multi CPU.

Langkah berikutnya kamu juga bisa mempelajari pola atau pattern concurrency yang lebih canggih, berikut salah satu contoh linknya github.com/lotusirous/go-concurrency-patterns

Semoga membantu.