ゴルーチン(goroutine)とチャネルを使った並行処理の実践例|Go言語
Go言語(golang)の特徴の1つに、ゴルーチン(goroutine)を使った並行処理があります。この記事では、ゴルーチンと、チャネルを使ったゴルーチン間の通信方法について、いくつかのパターンを例にしながら解説します。
ゴルーチン(goroutine)とは
goroutineとは
ゴルーチン(goroutine)は、Go言語の並行処理の基本単位です。通常の関数呼び出しとは異なり、呼び出された関数は並行に処理されます。
ゴルーチンは軽量で、Goランタイムにより効率的にスケジューリングされます。また、Goランタイムはゴルーチンを複数のプロセッサコアに分配することも可能です。これにより、コアが複数ある環境ではゴルーチンによる並列処理が行われます。
ゴルーチンのスケジューリングをGoランタイムが行なってくれるので、プログラマ側が「プロセッサへの割り当て」を考えずにプログラミングできるのは大きな利点です。
このように、Goは言語の機能として並行処理をサポートしているため、他の言語に比較しても手軽に並行処理を記述することが可能です。
とはいえ、並行処理はやはり少し面倒です。
この記事では、ゴルーチンを使った並行処理と、ゴルーチン間の同期方法について、いくつかのパターンを使って紹介します。基本的なパターンはおさえたつもりです。ここに紹介するパターンだけでも基本的な並行処理は記述できるようになるかと思います。
for文の繰り返しにゴルーチンを利用する例は以下の記事を参考にしてください。
使い方(基本)
ゴルーチンの呼び出しは簡単です。関数の呼び出し時にgo
というキーワードをつけるだけです。これだけで、関数を並行処理させることが可能です。
go 関数名()
goroutineに関連する機能(通信・同期など)
Go言語では、ゴルーチンの管理・同期に便利な機能がいくつか提供されています。ここではそれらについて、簡単に説明します。
チャネル(channel)
チャネルは、 ゴルーチン間でデータを送受信するための機能です。詳しくは、以下の記事を参照してください。
waitGroup
sync.WaitGroup
は、ゴルーチンが完了するのを待つための仕組みです。これを利用することで、ゴルーチンが完了するまでmain
の処理を待たせることができます。主に、同期に利用します。
select
select
文は、複数のチャネルを待ち受けて、(データの)準備ができたチャネルに対して操作を実行するために使います。複数のゴルチーンからのメッセージを受け付けて、メッセージが届いたら処理を行う場合などに利用します。
ゴルーチンを利用するプログラム例
送信側・受信側のゴルーチン間でデータを受け渡す
送信側send
関数から受信側recv
関数へデータを受け渡す例です。受信側では10
を受け取ったら終了します。
プログラムでは、チャネルを使ってデータを送受信しています。また、main
関数ではwg.Wait()
でsend/recv
の双方が終了するのを待っています。
このような記述を行うことで、送信側と受信側を同時に起動し、チャネルを使ってデータのやり取りを行うことが可能です。
このコードで、所謂、「生産者・消費者」を実現することができます
このパターンは、片方のゴルーチンがデータを受け取ったり読み込んだりして、もう1つのゴルーチンで処理するといったパターンに使うことが可能です。複数プロセッサが利用できる場合は、読み込みと処理を同時に行えるので処理を高速化可能です。
package main
import (
"fmt"
"sync"
"time"
)
func send(ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 10; i++ {
ch <- i
time.Sleep(time.Millisecond * 20)
}
fmt.Println("send end")
}
func recv(ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for true {
x := <-ch
fmt.Println("recv = ", x)
if x == 10 {
fmt.Println("recv end")
break
}
}
}
func main() {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(2)
go send(ch, &wg)
go recv(ch, &wg)
wg.Wait()
fmt.Println("main end")
}
実行結果
recv = 1
recv = 2
recv = 3
recv = 4
recv = 5
recv = 6
recv = 7
recv = 8
recv = 9
recv = 10
recv end
send end
main end
処理が終わるまで待つ(受け取るデータ数があらかじめ決まっている場合)
次の例は、あらかじめ受け取るデータ数が10と決まっている場合の例です。
main
関数では、先にgo proc(...)
で、ゴルーチンを10個起動しwg.Wait()
でゴルーチンが終了したのを確認してから、チャネル経由でデータを受け取っています。
注意する点は、ch := make(chan int, n)
というように、あらかじめn個のバッファを用意する必要があることです。バッファを用意しないと、チャネルがいっぱいになってしまいデットロックが発生します(全てのゴルーチンがチャネルにデータを送った後しか、チャネルからデータを取り出さないため、起動するゴルーチン分(n個)のバッファが必要となる)
受け取り側も、受け取る量がわかっているので、10個受け取ったら処理を終了しています。
このパターンは、計算処理を並列で行い、後で結果を集計する場合などに利用できます。
package main
import (
"fmt"
"math/rand"
"sort"
"sync"
"time"
)
func proc(n int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
l := rand.Int()%5 + 1
fmt.Println("Proc start ", n, "loop = ", l)
time.Sleep(time.Second * time.Duration(rand.Int63()%3))
ch <- n * n
fmt.Println("Proc end ", n)
}
func main() {
var wg sync.WaitGroup
const n = 10
ch := make(chan int, n)
for i := 0; i < n; i++ {
wg.Add(1)
go proc(i, ch, &wg)
}
wg.Wait()
a := []int{}
for i := 0; i < n; i++ {
x := <-ch
fmt.Println(x)
a = append(a, x)
}
sort.Ints(a)
fmt.Println(a)
}
実行結果
Proc start 4 loop = 2
Proc end 4
Proc start 0 loop = 4
Proc start 5 loop = 1
Proc start 3 loop = 5
Proc start 6 loop = 4
Proc start 8 loop = 2
Proc end 6
Proc start 1 loop = 1
Proc end 8
Proc start 2 loop = 4
Proc end 2
Proc start 9 loop = 2
Proc end 9
Proc start 7 loop = 4
Proc end 5
Proc end 0
Proc end 7
Proc end 1
Proc end 3
16
36
64
4
81
25
0
49
9
1
[0 1 4 9 16 25 36 49 64 81]
処理を依頼し、結果を受けとる(受け取るデータ数が未知の場合)
受け取るデータ数が決まっていない場合の例です。このパターンに当てはまる処理は結構多いと思います。
並行処理を記述する場合は、ある程度テンプレート化しておいた方がバグらずに済みます。このパターンは、私が並行処理を行う場合によく利用するもので、大きなデータを分割して処理し、結果をまとめるときなどに利用しています。
この例ではproc
はランダム個(1〜5個)のデータをチャネルに送信します。
終了待ち
main
関数では、ゴルーチンを起動した後、以下のようなゴルーチンを1つ起動しています。このゴルーチンは送信側が完了した時点でチャネルを閉じるというものです。
go func() {
wgSend.Wait()
close(ch)
}()
処理データの受け取り
データの受け取りはゴルーチンで行なっています。この関数ではselect
を使ってチャネルから受信しています。ok
は、チャネルがcloseするとfalse
となります。つまり、この関数はチャネルが閉じられるまでデータを受け取り、チャネルが閉じられると結果をソートして返すというものです。
go func() {
defer wgRecv.Done()
a := []int{}
for {
select {
case x, ok := <-ch:
if ok == false {
fmt.Println("recv end")
sort.Ints(a)
fmt.Println(a)
return
}
fmt.Println("recv <- ", x)
a = append(a, x)
}
}
}()
なお、送信しながら受け取っているので、チャネルのバッファサイズは少なくても構いません。この例ではch := make(chan int, 5)
と5つに設定していますが最後まで動作します。
プログラム全体
package main
import (
"fmt"
"math/rand"
"sort"
"sync"
"time"
)
func proc(n int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
l := rand.Int()%5 + 1
fmt.Println("Proc start ", n, "loop = ", l)
for i := 0; i < l; i++ {
time.Sleep(time.Second * time.Duration(rand.Int63()%3))
ch <- n
}
fmt.Println("Proc end ", n)
}
func main() {
var wgSend sync.WaitGroup
const n = 10
ch := make(chan int, 5)
for i := 0; i < n; i++ {
wgSend.Add(1)
go proc(i, ch, &wgSend)
}
go func() {
wgSend.Wait()
close(ch)
}()
var wgRecv sync.WaitGroup
wgRecv.Add(1)
go func() {
defer wgRecv.Done()
a := []int{}
for {
select {
case x, ok := <-ch:
if ok == false {
fmt.Println("recv end")
sort.Ints(a)
fmt.Println(a)
return
}
fmt.Println("recv <- ", x)
a = append(a, x)
}
}
}()
wgRecv.Wait()
}
実行すると、以下のようになります。
最後の出力[0 0 0 0 0 1 1 1 1 1 2 2 2 2 3 4 4 4 5 5 5 6 6 6 6 6 7 8 8 9]を見ると、loop
回数分だけそれぞれのProcに渡された番号が繰り返されていることがわかります(例えば、Proc start 0 loop = 5となっているため、0は5つ繰り返されている)
実行結果
Proc start 6 loop = 5
Proc start 5 loop = 3
Proc end 5
Proc start 0 loop = 5
Proc start 3 loop = 1
Proc start 2 loop = 4
Proc start 7 loop = 1
Proc start 1 loop = 5
Proc start 9 loop = 1
Proc end 9
Proc start 4 loop = 3
Proc start 8 loop = 2
recv <- 6
recv <- 5
recv <- 5
recv <- 5
recv <- 9
recv <- 4
recv <- 4
recv <- 8
recv <- 0
Proc end 8
recv <- 8
recv <- 0
recv <- 1
recv <- 1
recv <- 1
recv <- 1
recv <- 6
recv <- 0
recv <- 3
Proc end 1
recv <- 1
Proc end 7
recv <- 7
recv <- 2
recv <- 4
Proc end 4
Proc end 3
recv <- 6
recv <- 6
Proc end 6
recv <- 2
recv <- 6
recv <- 0
Proc end 0
recv <- 0
recv <- 2
Proc end 2
recv <- 2
recv end
[0 0 0 0 0 1 1 1 1 1 2 2 2 2 3 4 4 4 5 5 5 6 6 6 6 6 7 8 8 9]
まとめ
ゴルーチンの使い方について記事にしてみました。記述自体は簡単ですが、並行処理自体は結構難しく、ちょっとミスるとデットロックなどが発生してしまいます。今回は、よく使うパターンを簡単にまとめてみました。
最後の例は、大きな処理を複数のゴルーチンに分けて処理し、最後に結果をまとめる場合などに多用するパターンです。