Menggunakan Fitur Concurrency di Go

Intro
Salah satu nilai jual dari bahasa Go adalah fitur concurrency, fitur ini sering menjadi alasan utama kebanyakan orang memilih Go. Diartikel ini kita akan membahas tool yang tersedia di Go untuk membuat kode yang concurrent, diantara adalah sebagai berikut.
- Goroutine
- Channel
- Select
- Package Sync
Concurrency Vs Paralelisme
Go mempunyai fitur concurrency secara native, tetapi terkadang ada kesalah-pahaman bahwa concurrency dan paralelisme adalah kedua hal yang sama, padahal kedua istilah ini memiliki makna yang berbeda.
concurrency adalah tentang bagaimana mengatur dan berurusan dengan banyak pekerjaan dalam satu waktu. Apa maksudnya ?
Pada dasarnya 1 CPU hanya bisa melakukan 1 pekerjaan dalam 1 waktu, sebagai contoh jika kita membuka banyak program sekaligus diperangkat yang memiliki 1 CPU, secara sederhana CPU tersebut akan berpindah-pindah (context switch) dengan cepat antara program 1 dengan program yang lain. hal ini menciptkan sebuah ilusi seolah-olah 1 CPU tersebut melakukan banyak pekerjaan secara sekaligus, padahal kenyataanya tidak. inilah yang disebut concurrency.
Dilain itu, paralel adalah beberapa pekerjaan dikerjakan secara bersamaan, ini mengharuskan perangkat yang digunakan mempunyai lebih dari 1 CPU.
Kode yang didesain concurrent bisa saja berjalan secara paralel (dilingkungan multi CPU), tetapi hal ini bukanlah keharusan.
Di Go, kita bisa menggunakan Goroutine untuk membuat program yang bersifat concurrent, fitur tersebut diharapkan membuat program yang kita buat menjadi lebih efektif, baik dilingkungan single CPU ataupun multi CPU. Tetapi yang perlu diingat adalah, menggunakan fitur concurrency bukan berarti program yang kita buat otomatis menjadi lebih cepat, ini sangat bergantung dari algoritma yang dibuat dan jumlah core yang dimiliki.
Apa itu Goroutine ?
Dibeberapa bahasa pemrograman kita bisa menggunakan OS threads (thread pada level sistem operasi) secara langsung untuk melakukan pekerjaan secara paralel, tetapi hal tersebut tidak terlalu efisien dari sisi performa dan memori karena perlunya berkomunikasi dengan resource dilevel sistem operasi. hal ini menjadi salah satu alasan utama dibuatnya fitur concurrency secara native didalam bahasa Go.
Goroutine adalah sebuah function yang dieksekusi secara independen dan dapat berjalan secara bersamaan (concurrent) dengan function lainnya. Goroutine ini mirip thread, bedanya goroutine dimanage oleh Go runtime, sedangkan thread dimanage oleh sistem operasi, hal ini yang membuat goroutine menjadi lebih ringan dan efisien. 1 Goroutine hanya memakan memori beberapa kilobyte saja, secara teknis membuat puluhan ribu goroutine bukanlah sebuah masalah.
Cara Menggunakan Goroutine
Untuk menjalankan sebuah Goroutine kita bisa menggunakan keyword Go
sebelum menjalankan sebuah function.
go doSomething()
ketika kita menggunakan keyword go
tersebut, function tersebut akan berjalan dibackground secara concurrent, yang artinya kita tidak perlu menunggu function tersebut selesai untuk melanjutkan eksekusi berikutnya.
berikut contoh kodenya secara lengkap.
package main
import (
"fmt"
"time"
)
func main() {
// jalankan 2 goroutine secara concurrent
go count("A")
go count("B")
fmt.Println("Main goroutine finished")
}
func count(s string) {
for i := 1; i <= 3; i++ {
time.Sleep(1 * time.Second)
fmt.Println(s, ": ", i)
}
}
Output
Main goroutine finished
dilihat dari output diatas, bisa diketahui bahwa program yang baru saja kita jalankan langsung exit begitu saja, padahal kita menjalankan 2 function count
dengan goroutine yang membutuhkan waktu sekitar 3 detik, hal ini karena jika goroutine utama berakhir (function main
), maka program akan keluar dan goroutine yang berjalan dibackground akan terhenti. mari kita ubah kode diatas menjadi sebagai berikut
package main
import (
"fmt"
"time"
)
func main() {
// Starting two goroutines
go count("A")
go count("B")
// Wait for goroutines to finish before main goroutine ends
time.Sleep(time.Second * 4)
fmt.Println("Main goroutine finished")
}
func count(s string) {
for i := 1; i <= 3; i++ {
time.Sleep(1 * time.Second)
fmt.Println(s, ": ", i)
}
}
Output
A : 1
B : 1
A : 2
B : 2
B : 3
A : 3
Main goroutine finished
setelah kita tambahkan fungsi sleep di main goroutine selama 4 detik, dari situ kita bisa melihat dua function count tadi dapat berjalan. dari outputnya kita juga bisa melihat output dari masing-masing function saling tumpang tindih, hal ini menandakan kedua function berjalan secara concurrent.
Berkomunikasi menggunakan Channel
ketika kita menggunakan goroutine, kita tidak bisa mengakses return value dari function tersebut. namun, ada saatnya kita membutuhkan 2 atau lebih goroutine untuk berkomunikasi/bertukar data. disituasi tersebut, kita bisa menggunakan channel sebagai media komunikasi.
untuk membuat channel kita dapat menggunakan function make()
seperti berikut
ch := make(chan int) //membuat channel dengan tipe data integer
untuk mengirim data ke channel kita bisa menggunakan code seperti berikut
ch <- b // kirim data dari variabel b ke channel ch
dan untuk menerima data kita seperti berikut
a := <-ch // terima data dari channel ch kemudian assign ke variable a
berikut contoh kodenya secara lengkap
package main
import (
"fmt"
)
func main() {
// buat channel bertipe string
msgChannel := make(chan string)
// eksekusi function dengan goroutine
go channelData(msgChannel)
// baris ini akan mengeblok eksekusi,
// sampai ada message yang diterima
fmt.Println("msgChannel Data:", <-msgChannel)
}
func channelData(msg chan string) {
//line ini akan mengeblok eksekusi sampai ada penerima yang mengambil message tersebut
msg <- "Belajar Go"
}
Output
Belajar Go
Pada kode diatas operasi menerima pesan <-msgChannel
akan bersifat blocking, sampai ada pesan yang diterima barulah eksekusi akan berlanjut, hal tersebut hanya berlaku diunbuffered channel yang akan kita bahas dibawah.
For-range dan channel
untuk menerima message dari channel kita juga bisa menggunakan keyword for
, terutama jika message yang dikirim berulang-kali dalam 1 channel
for v := range ch {
fmt.Println(v)
}
berikut contoh lengkap penggunaan for
dengan channel
package main
import (
"fmt"
"time"
)
func produce(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
close(ch)
}
func main() {
ch := make(chan int)
go produce(ch)
for val := range ch {
fmt.Println(val)
}
}
Output
0
1
2
3
4
jika kita melihat kode diatas, kita bisa menggunakan kode for val := range ch
untuk menerima message dari channel. Dalam setiap iterasi, variable val
akan menerima nilai berikutnya dari message yang dikirim channel ch
. iterasi akan terhenti jika channel ch
tersebut ditutup dengan function close(ch)
Closing Channel
Setelah kita selesai mengirim data ke sebuah channel, kita dapat menutup channel tersebut menggunakan function close(ch)
. Penutupan ini mengindikasikan bahwa tidak ada lagi value yang akan dikirim ke channel tersebut. Jika kita mencoba mengirim data ke channel yang telah ditutup, hal tersebut akan menimbulkan panic
.
ketika suatu channel ditutup, channel tersebut juga akan memberikan pesan ke penerima, kita bisa mengaksesnya seperti berikut
v, ok := <-ch
ketika variabel ok
bernilai false
, kita bisa menilai bahwa channel itu sudah ditutup, dan value yang diberikan hanyalah default value saja (zero value) yang biasanya tidak kita gunakan.
hal yang perlu diingat adalah hanya pengirim pesan pada channel (writer) yang mempunyai tanggung jawab untuk menutup channel dan kita hanya perlu menutup channel jika ada goroutine yang perlu tahu kapan suatu channel ditutup (semisal for-range).
Unbuffered dan Buffered Channel
Dibahasa Go ada 2 tipe channel yang tersedia
- Unbuffered Channel
- Buffered Channel
Secara default, saat kita membuat channel, tipe channel yang dihasilkan adalah Unbuffered. Apa artinya? Unbuffered channel merupakan channel tanpa kapasitas penyimpanan (buffer) untuk menyimpan pesan. Ini berarti setiap operasi pengiriman (send) akan menunggu hingga ada penerima (receive) yang tersedia. Jika tidak ada penerima yang siap, maka operasi pengiriman akan blocking. Perhatikan kode berikut.
package main
import "fmt"
func main() {
//buat unbuffered channel bertipe data string
ch := make(chan string)
//kirim message ke channel ch
ch <- "Learning Go channel"
//terima message dari channel ch dan print hasilnya
fmt.Println(<-ch)
}
Output
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/home/ibnu/personal/playground-go/main.go:9 +0x37
exit status 2
pada line ke 9 kita mengirim pesan ke unbuffered channel. Karena channel tersebut tidak mempunya kapasitas penyimpanan, operasi pengiriman tersebut akan menunggu sampai ada penerima yang siap menerima, namun karena operasi pengiriman tadi terblock, operasi penerimaan di line 11 juga tidak akan dieksekusi, inilah yang menyebabkan deadlock.
mari kita coba ubah channel tersebut menjadi buffered channel dengan kapasitas 1
package main
import "fmt"
func main() {
//buat buffered channel bertipe data string dengan kapasitar penyimpanan 1
ch := make(chan string, 1)
//kirim message ke channel ch
ch <- "Learning Go channel"
//terima message dari channel ch dan print hasilnya
fmt.Println(<-ch)
}
Output
Learning Go channel
dari output diatas bisa kita melihat bahwa buffered channel tidak blocking selama kapasitas penyimpanan masih tersedia.
mari kita coba kirim pesan lebih dari 1 kali seperti berikut
package main
import "fmt"
func main() {
//buat buffered channel bertipe data string dengan kapasitar penyimpanan 1
ch := make(chan string, 1)
//non blocking karena kapasitas channel masih tersedia
ch <- "Message 1"
//blocking karena channel penuh
ch <- "Message 2"
//terima message dari channel ch dan print hasilnya
fmt.Println(<-ch)
}
Output
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/home/ibnu/personal/playground-go/main.go:10 +0x4b
exit status 2
kode tersebut menjadi blocking/deadlock kembali karena pada line 11 kita mengirim pesan lagi yang membuat kapasitas channel penuh. Hal ini juga dikarenakan message pertama yang dikirim dari channel tersebut juga masih tersimpan.
Select
Ada kalanya ketika kita membuat program dengan Go, kita menggunakan banyak channel untuk berkomunikasi dengan banyak goroutine.
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch1 <- "gorutine 1 selesai dalam waktu 3 detik"
}()
go func() {
time.Sleep(1 * time.Second)
ch2 <- "gorutine 2 selesai dalam waktu 1 detik"
}()
fmt.Println(<-ch1)
fmt.Println(<-ch2)
}
Output
gorutine 1 selesai dalam waktu 3 detik
gorutine 2 selesai dalam waktu 1 detik
hal yang kurang sesuai dari program diatas adalah meskipun goroutine 2 selesai lebih awal kita tidak bisa menggunakan pesan dari goroutine 2 sebelum goroutine 1 selesai, hal ini bisa menjadi bottleneck yang cukup mengganggu, dari masalah tersebut kita bisa menggunakan select
keyword.
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch1 <- "gorutine 1 selesai dalam waktu 3 detik"
}()
go func() {
time.Sleep(1 * time.Second)
ch2 <- "gorutine 2 selesai dalam waktu 1 detik"
}()
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
Output
gorutine 2 selesai dalam waktu 1 detik
dari contoh diatas kita bisa menggunakan keyword select untuk menunggu pesan dari beberapa channel kemudian bereaksi terhadap channel yang pertama kali mengirim pesan.
Agar dapat memproses pesan dari channel selanjutnya kita juga bisa bisa menggunakan for-loop seperti berikut
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch1 <- "gorutine 1 selesai dalam waktu 3 detik"
}()
go func() {
time.Sleep(1 * time.Second)
ch2 <- "gorutine 2 selesai dalam waktu 1 detik"
}()
//loop 2 kali karena hanya ada 2 pesan yang diterima
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
Output
gorutine 2 selesai dalam waktu 1 detik
gorutine 1 selesai dalam waktu 3 detik
layaknya switch
kita juga bisa menggunakan default case jika tidak ada channel yang mengirim message dalam suatu waktu
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch1 <- "selesai dalam waktu 3 detik"
}()
go func() {
time.Sleep(1 * time.Second)
ch2 <- "selesai dalam waktu 1 detik"
}()
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
default:
fmt.Println("tidak ada channel yang siap mengirim data")
}
fmt.Println("program selesai")
}
Output
tidak ada channel yang siap mengirim data
program selesai
Sync package
Selain menggunakan channel dan select untuk sinkronasi di antara goroutine, Go juga menyediakan metode sinkronasi lain yang lebih primitive melalui package sync
.
Dilain itu yang perlu diperhatikan adalah salah satu pepatah dalam Go Proverb "Do not communicate by sharing memory. instead, share memory by communicating". Pepatah ini menunjukan bahwa sinkronasi antar goroutine sebaiknya dilakukan melalui channel. Namun, dalam beberapa situasi, menggunakan package sync bisa saja menjadi pilihan yang lebih tepat karena lebih simple dan performa yang lebih optimal.
Package sync ini mempunyai beberapa fungsi, namun kali ini kita akan membahasan hanya 2 fungsi saja yaitu sync.WaitGroup
dan sync.Mutex
sync.WaitGroup
sync.WaitGroup
ini sangat cocok ketika kita ingin menunggu beberapa goroutine sekaligus sampai selesai. Berikut contohnya
package main
import (
"fmt"
"sync"
"time"
)
func doSomeWork(id int) {
time.Sleep(time.Second)
fmt.Println("done", id)
}
func main() {
// buat sync.WaitGroup bernama wg
var wg sync.WaitGroup
// launch 5 goroutine
const numWorkers = 5
for i := 0; i < numWorkers; i++ {
// Increment wg counter setiap menambah 1 goroutine
wg.Add(1)
go func(id int) {
doSomeWork(id)
// setelah pekerjaan selesai decrement wg counter
wg.Done()
}(i)
}
// line ini akan blocking sampai wg counter bernilai 0
wg.Wait()
fmt.Println("All workers completed.")
}
Output
done 1
done 0
done 4
done 3
done 2
All goroutine completed.
Pada dasarnya sync.WaitGroup
adalah sebuah counter, ketika kita menjalankan sebuah gorutine kita mengeksekusi wg.Add(1)
untuk increment counter dan wg.Done()
untuk decrement counter setiap pekerjaan selesai. kemudian kita mengeksekusi wg.Wait()
yang akan blocking sampai counter bernilai 0 yang menandakan semua pekerjaan telah selesai dan eksekusi bisa dilanjutkan.
sync.Mutex
Ketika kita memiliki sebuah data yang bisa diakses dan dimodikasi oleh banyak goroutine secara bersamaan, hal ini dapat memunculkan masalah. perhatikan kode dibawah
package main
import (
"fmt"
"sync"
)
type UnSafeCounter struct {
v int
}
func (c *UnSafeCounter) Increment() {
c.v++
}
func (c *UnSafeCounter) Value() int {
return c.v
}
func main() {
var c UnSafeCounter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Increment()
wg.Done()
}()
}
wg.Wait()
fmt.Println(c.Value())
}
Output
995
Dari contoh di atas, meskipun kita mengincrement counter sebanyak 1000 kali, output yang dihasilkan bisa jadi tidak 1000. Kasus ini biasa disebut "race condition"
Ketika banyak goroutine mencoba mengakses dan memodifikasi suatu variabel secara bersamaan tanpa koordinasi yang tepat, mereka bisa "menimpa" perubahan yang dilakukan oleh goroutine lain.
Misalnya, jika dua goroutine membaca nilai v
pada saat yang sama (misalnya v
adalah 5). dua goroutin tadi akan menghasilkan nilai increment akhir yaitu 6, yang semestinya bernilai 7.
Dari kasus diatas maka diperlukanlah mekanisme locking, dimana dalam 1 waktu hanya 1 gorutine yang boleh melakukan increment, di Go kita bisa menggunakan sync.Mutex. Mari kita coba ubah kode counter tadi menjadi concurrent safe
package main
import (
"fmt"
"sync"
)
type SafeCounter struct {
v int
mux sync.Mutex //tambahkan 1 property sync.Mutex
}
func (c *SafeCounter) Increment() {
//kunci counter agar hanya 1 goroutine yang dapat mengakses dalam 1 waktu
c.mux.Lock()
c.v++
//buka kunci setelah proses selesai
c.mux.Unlock()
}
func (c *SafeCounter) Value() int {
c.mux.Lock()
defer c.mux.Unlock()
return c.v
}
func main() {
var c SafeCounter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Increment()
wg.Done()
}()
}
wg.Wait()
fmt.Println(c.Value())
}
Output
1000
setelah kita menambah sync.Mutex
ke counter tersebut dan menggunakan Lock()
dan Unlock()
sebelum mengakses dan merubah data, kita bisa memastikan data counter yang dihasilkan benar dan selalu konsisten meskipun ada 1000 goroutine yang melakukan increment secara concurrent.
Penutup
Setelah memahami dasar-dasar concurrency di Go yaitu goroutine, channel, select, dan sync package, diharapkan kita dapat membuat program yang lebih efektif dilingkungan multi CPU.
Langkah berikutnya kamu juga bisa mempelajari pola atau pattern concurrency yang lebih canggih, berikut salah satu contoh linknya github.com/lotusirous/go-concurrency-patterns
Semoga membantu.