title: 【転載】Go 言語基礎の並行処理
date: 2021-08-09 16:39:33
comment: false
toc: true
category:
- Golang
tags: - 転載
- Go
- 基礎
- 並行
この記事は、Go 言語基礎の並行処理 | 李文周のブログから転載されています。
並行処理はプログラミングにおいて非常に重要な概念であり、Go 言語は言語レベルで並行処理を自然にサポートしています。これが Go 言語が人気である大きな理由の一つです。
Go 言語における並行プログラミング#
並行と並列#
並行:同じ時間内に複数のタスクを実行すること(あなたが WeChat で 2 人の彼女とチャットしている状況)。
並列:同じ瞬間に複数のタスクを実行すること(あなたと友達がそれぞれ WeChat で彼女とチャットしている状況)。
Go 言語の並行処理はgoroutine
によって実現されます。goroutine
はスレッドに似ており、ユーザーレベルのスレッドです。必要に応じて数千のgoroutine
を作成して並行に動作させることができます。goroutine
は Go 言語のランタイム(runtime)によってスケジュールされ、スレッドはオペレーティングシステムによってスケジュールされます。
Go 言語はまた、複数のgoroutine
間で通信を行うためのchannel
を提供しています。goroutine
とchannel
は、Go 言語が採用している CSP(Communicating Sequential Process)並行モデルの重要な実装基盤です。
goroutine#
Java/C++ で並行プログラミングを実現する場合、通常は自分でスレッドプールを管理し、タスクをラップし、スレッドをスケジュールしてコンテキストスイッチを維持する必要があります。これらは通常、プログラマーにとって多くの精神的負担を伴います。では、プログラマーが多くのタスクを定義し、システムがこれらのタスクを CPU に割り当てて並行実行を実現するメカニズムはないのでしょうか?
Go 言語のgoroutine
はそのようなメカニズムです。goroutine
の概念はスレッドに似ていますが、goroutine
は Go のランタイム(runtime)によってスケジュールされ管理されます。Go プログラムは、goroutine
内のタスクを各 CPU に合理的に割り当てることができます。Go 言語が現代的なプログラミング言語と呼ばれる理由は、言語レベルでスケジューリングとコンテキストスイッチのメカニズムが組み込まれているからです。
Go 言語のプログラミングでは、プロセス、スレッド、コルーチンを書く必要はありません。あなたのスキルセットにはgoroutine
という 1 つのスキルしかありません。特定のタスクを並行に実行したい場合、そのタスクを関数としてラップし、goroutine
を開始してその関数を実行するだけで済みます。これほど簡単で直接的です。
goroutine の使用#
Go 言語でgoroutine
を使用するのは非常に簡単で、関数を呼び出す際に前にgo
キーワードを追加するだけで、その関数に対してgoroutine
を作成できます。
1 つのgoroutine
は必ず 1 つの関数に対応し、同じ関数を実行するために複数のgoroutine
を作成できます。
単一の goroutine を起動する#
goroutine
を起動する方法は非常に簡単で、呼び出す関数(通常の関数や無名関数)の前にgo
キーワードを追加するだけです。
以下の例を見てみましょう:
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}
この例では、hello 関数と次の文は直列に実行され、結果はHello Goroutine!
が印刷された後にmain goroutine done!
が印刷されます。
次に、hello 関数の前にgo
キーワードを追加して、hello 関数を実行するためにgoroutine
を起動します。
func main() {
go hello() // 別のgoroutineを起動してhello関数を実行
fmt.Println("main goroutine done!")
}
この実行結果は、main goroutine done!
だけが印刷され、Hello Goroutine!
は印刷されません。なぜでしょうか?
プログラムが起動すると、Go プログラムはmain()
関数のためにデフォルトのgoroutine
を作成します。
main()
関数が戻ると、そのgoroutine
は終了し、main()
関数内で起動されたすべてのgoroutine
も一緒に終了します。main
関数が存在するgoroutine
は、まるで「ゲーム・オブ・スローンズ」の夜の王のようで、他のgoroutine
はすべて異鬼です。夜の王が死ぬと、彼が変換した異鬼もすべて死んでしまいます。
したがって、main
関数が hello 関数を待つようにする必要があります。最も簡単で直接的な方法はtime.Sleep
です。
func main() {
go hello() // 別のgoroutineを起動してhello関数を実行
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
上記のコードを実行すると、今回は最初にmain goroutine done!
が印刷され、その後すぐにHello Goroutine!
が印刷されることがわかります。
最初にmain goroutine done!
が印刷される理由は、新しい goroutine を作成するのに時間がかかるため、その間に main 関数のあるgoroutine
は実行を続けます。
複数の goroutine を起動する#
Go 言語で並行処理を実現するのはこのように簡単です。さらに、複数のgoroutine
を起動することもできます。もう一つの例を見てみましょう:(ここではsync.WaitGroup
を使用して goroutine の同期を実現しています)
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutineが終了したら-1を記録
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // goroutineを起動するたびに+1を記録
go hello(i)
}
wg.Wait() // 登録されたすべてのgoroutineが終了するのを待つ
}
上記のコードを何度も実行すると、印刷される数字の順序が毎回異なることがわかります。これは、10 個のgoroutine
が並行に実行され、goroutine
のスケジューリングがランダムであるためです。
goroutine とスレッド#
可変サイズのスタック#
OS スレッド(オペレーティングシステムスレッド)は通常、固定のスタックメモリ(通常は 2MB)を持ちますが、goroutine
のスタックはそのライフサイクルの開始時に非常に小さなスタック(典型的には 2KB)を持ちます。goroutine
のスタックは固定ではなく、必要に応じて増減することができ、goroutine
のスタックサイズ制限は 1GB に達することができますが、これほど大きなサイズを使用することは非常に稀です。したがって、Go 言語では 10 万程度のgoroutine
を一度に作成することも可能です。
goroutine のスケジューリング#
GPM
は Go 言語のランタイム(runtime)レベルの実装であり、Go 言語が独自に実装したスケジューリングシステムです。OS スレッドのスケジューリングとは異なります。
G
は goroutine を表し、その中には本 goroutine の情報と、所属する P とのバインディング情報が含まれています。P
は一群の goroutine キューを管理し、P の中には現在の goroutine の実行コンテキスト(関数ポインタ、スタックアドレスおよびアドレス境界)が保存されます。P は自分が管理する goroutine キューのスケジューリングを行い(例えば、CPU 時間を長く占有している goroutine を一時停止し、次の goroutine を実行するなど)、自分のキューが消費されると、グローバルキューから取得します。グローバルキューも消費されると、他の P のキューからタスクを奪います。M(machine)
は Go ランタイム(runtime)がオペレーティングシステムのカーネルスレッドを仮想化したもので、M とカーネルスレッドは一般的に 1 対 1 のマッピング関係にあります。最終的に 1 つの goroutine は M 上で実行されます。
P と M は一般的に 1 対 1 で対応しています。彼らの関係は、P が一群の G を管理し、M 上で実行されることです。ある G が M 上で長時間ブロックされていると、ランタイムは新しい M を作成し、ブロックされた G が所属する P は他の G を新しい M に割り当てます。古い G がブロックを解除するか、死んだと見なされると、古い M は回収されます。
P の数はruntime.GOMAXPROCS
で設定されます(最大 256)。Go1.5 バージョン以降はデフォルトで物理スレッド数になります。並行量が多い場合は、いくつかの P と M を増やしますが、あまり多くはありません。切り替えが頻繁すぎると、逆効果になります。
スレッドスケジューリングの観点から、Go 言語は他の言語に比べて優位性があります。OS スレッドは OS カーネルによってスケジュールされますが、goroutine
は Go ランタイム(runtime)の独自のスケジューラによってスケジュールされます。このスケジューラは、mスケジューリング技術(m 個の goroutine を n 個の OS スレッドに再利用 / スケジューリング)を使用しています。大きな特徴は、goroutine のスケジューリングがユーザーレベルで完了し、カーネルレベルとユーザーレベルの間の頻繁な切り替えを含まず、メモリの割り当てと解放もユーザーレベルで大きなメモリプールを維持し、システムの malloc 関数を直接呼び出さない(メモリプールが変更される場合を除く)ため、OS スレッドのスケジューリングよりもコストが大幅に低くなります。さらに、マルチコアのハードウェアリソースを十分に活用し、複数の goroutine を物理スレッドに均等に分配し、goroutine 自体が非常に軽量であるため、これらすべてが Go のスケジューリング性能を保証しています。
GOMAXPROCS#
Go ランタイムのスケジューラは、GOMAXPROCS
パラメータを使用して、同時に Go コードを実行するために使用する OS スレッドの数を決定します。デフォルト値はマシンの CPU コア数です。たとえば、8 コアのマシンでは、スケジューラは Go コードを同時に 8 つの OS スレッドにスケジュールします(GOMAXPROCS は mスケジューリングの n です)。
Go 言語では、runtime.GOMAXPROCS()
関数を使用して、現在のプログラムの並行処理で占有する CPU 論理コア数を設定できます。
Go1.5 バージョン以前は、デフォルトで単一コアで実行されていました。Go1.5 バージョン以降は、デフォルトで全ての CPU 論理コア数を使用します。
タスクを異なる CPU 論理コアに割り当てることで並列処理を実現できます。以下に例を示します:
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(1)
go a()
go b()
time.Sleep(time.Second)
}
2 つのタスクは 1 つの論理コアしかないため、この場合は 1 つのタスクを終えてから次のタスクを実行します。論理コア数を 2 に設定すると、2 つのタスクが並行に実行されます。コードは以下の通りです。
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(2)
go a()
go b()
time.Sleep(time.Second)
}
Go 言語におけるオペレーティングシステムスレッドと goroutine の関係は以下の通りです:
- 1 つのオペレーティングシステムスレッドはユーザーレベルの複数の goroutine に対応します。
- Go プログラムは複数のオペレーティングシステムスレッドを同時に使用できます。
- goroutine と OS スレッドは多対多の関係、すなわち mです。
channel#
単に関数を並行に実行するだけでは意味がありません。関数間でデータを交換する必要があります。
共有メモリを使用してデータを交換することもできますが、異なるgoroutine
間で共有メモリを使用すると競合状態が発生しやすくなります。データ交換の正確性を保証するためには、ミューテックスを使用してメモリにロックをかける必要がありますが、この方法は必然的に性能問題を引き起こします。
Go 言語の並行モデルは CSP(Communicating Sequential Processes)であり、通信を通じてメモリを共有することを提唱しています。共有メモリを通じて通信を実現するのではありません。
goroutine
が Go プログラムの並行実行の実体であるとすれば、channel
はそれらの間の接続です。channel
は、あるgoroutine
が特定の値を別のgoroutine
に送信するための通信メカニズムです。
Go 言語のチャネル(channel)は特別な型です。チャネルはコンベアベルトやキューのようなもので、常に先入先出(First In First Out)のルールに従い、データの送受信の順序を保証します。各チャネルは特定の型の導管であり、チャネルを宣言する際にはその要素の型を指定する必要があります。
channel の型#
channel
は型の一種であり、参照型です。チャネル型を宣言する形式は以下の通りです:
var 変数 chan 要素型
いくつかの例を示します:
var ch1 chan int // 整数型を渡すチャネルを宣言
var ch2 chan bool // ブール型を渡すチャネルを宣言
var ch3 chan []int // intスライスを渡すチャネルを宣言
channel の作成#
チャネルは参照型であり、チャネル型の空値はnil
です。
var ch chan int
fmt.Println(ch) // <nil>
宣言されたチャネルは、make
関数を使用して初期化した後でなければ使用できません。
チャネルを作成する形式は以下の通りです:
make(chan 要素型, [バッファサイズ])
チャネルのバッファサイズはオプションです。
いくつかの例を示します:
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)
channel の操作#
チャネルには送信(send)、受信(receive)、および閉じる(close)の 3 つの操作があります。
送信と受信は両方とも<-
記号を使用します。
まず、以下の文を使用してチャネルを定義します:
ch := make(chan int)
送信#
値をチャネルに送信します。
ch <- 10 // 10をchに送信
受信#
チャネルから値を受信します。
x := <- ch // chから値を受信し、変数xに代入
<-ch // chから値を受信し、結果を無視
閉じる#
内蔵のclose
関数を呼び出してチャネルを閉じます。
close(ch)
チャネルを閉じる際に注意すべきことは、受信側の goroutine にすべてのデータが送信完了したことを通知したときにのみチャネルを閉じる必要があるということです。チャネルはガベージコレクションメカニズムによって回収されることができ、ファイルを閉じるのとは異なります。操作を終了した後にファイルを閉じることは必ず行う必要がありますが、チャネルを閉じることは必須ではありません。
閉じたチャネルには以下の特徴があります:
- 閉じたチャネルに値を送信すると panic が発生します。
- 閉じたチャネルから受信すると、チャネルが空になるまで値を取得し続けます。
- 値がない閉じたチャネルから受信操作を行うと、対応する型のゼロ値が得られます。
- すでに閉じたチャネルを再度閉じると panic が発生します。
バッファなしのチャネル#
バッファなしのチャネルはブロッキングチャネルとも呼ばれます。以下のコードを見てみましょう:
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("送信成功")
}
上記のコードはコンパイルを通過しますが、実行時に以下のエラーが発生します:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54
なぜdeadlock
エラーが発生するのでしょうか?
ch := make(chan int)
で作成したのはバッファなしのチャネルであり、バッファなしのチャネルは誰かが値を受信するまで値を送信できません。あなたが住んでいるマンションに宅配ボックスや代行受取所がない場合、配達員はあなたに電話をかけて、物をあなたの手に届けなければなりません。簡単に言えば、バッファなしのチャネルは受信がなければ送信できません。
上記のコードはch <- 10
の行でブロックされ、デッドロックが発生します。では、この問題をどう解決すればよいのでしょうか?
1 つの方法は、goroutine
を起動して値を受信することです。例えば:
func recv(c chan int) {
ret := <-c
fmt.Println("受信成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // goroutineを起動してチャネルから値を受信
ch <- 10
fmt.Println("送信成功")
}
バッファなしのチャネルでの送信操作は、別のgoroutine
がそのチャネルで受信操作を実行するまでブロックされます。このとき、値は正常に送信され、2 つのgoroutine
は実行を続けます。逆に、受信操作が先に実行されると、受信側の goroutine はブロックされ、別のgoroutine
がそのチャネルで値を送信するまで待機します。
バッファなしのチャネルを使用して通信を行うと、送信と受信のgoroutine
が同期化されます。したがって、バッファなしのチャネルは同期チャネル
とも呼ばれます。
バッファ付きのチャネル#
上記の問題を解決するもう 1 つの方法は、バッファ付きのチャネルを使用することです。make
関数を使用してチャネルを初期化する際に、チャネルの容量を指定できます。例えば:
func main() {
ch := make(chan int, 1) // 容量1のバッファ付きチャネルを作成
ch <- 10
fmt.Println("送信成功")
}
チャネルの容量が 0 より大きければ、そのチャネルはバッファ付きのチャネルです。チャネルの容量は、チャネル内に格納できる要素の数を示します。あなたのマンションの宅配ボックスが特定の数の格子を持っているように、格子が満杯になると、もう入らなくなり、ブロックされます。他の人が荷物を取り出すと、配達員はその中に 1 つを入れることができます。
内蔵のlen
関数を使用してチャネル内の要素の数を取得し、cap
関数を使用してチャネルの容量を取得できますが、私たちはあまりこれを行いません。
for range でチャネルから値をループして取得#
チャネルにデータを送信し終わったら、close
関数を使用してチャネルを閉じることができます。
チャネルが閉じられると、そのチャネルに値を送信するとpanic
が発生し、そのチャネルから値を取得する操作はチャネル内の値をすべて取得した後、次に取得される値は常に対応する型のゼロ値になります。では、チャネルが閉じられたかどうかを判断するにはどうすればよいのでしょうか?
以下の例を見てみましょう:
// channel 練習
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// goroutineを起動して0〜100の数をch1に送信
go func() {
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}()
// goroutineを起動してch1から値を受信し、その値の平方をch2に送信
go func() {
for {
i, ok := <-ch1 // チャネルが閉じた後に値を取得するとok=false
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}()
// メインgoroutineでch2から値を受信して印刷
for i := range ch2 { // チャネルが閉じた後にfor rangeループを終了
fmt.Println(i)
}
}
上記の例から、値を受信する際にチャネルが閉じられたかどうかを判断する 2 つの方法があることがわかりますが、通常はfor range
の方法を使用します。for range
を使用してチャネルを反復処理すると、チャネルが閉じられたときにfor range
を終了します。
一方向チャネル#
時には、チャネルをパラメータとして複数のタスク関数間で渡すことがありますが、多くの場合、異なるタスク関数でチャネルを使用する際に制限を加えます。例えば、関数内でチャネルを送信専用または受信専用に制限することです。
Go 言語では、このような状況を処理するために一方向チャネルを提供しています。例えば、上記の例を次のように改造します:
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}
ここで、
chan<- int
は書き込み専用の一方向チャネル(int 型値の書き込みのみ可能)で、送信操作を実行できますが、受信操作は実行できません。<-chan int
は読み取り専用の一方向チャネル(int 型値の読み取りのみ可能)で、受信操作を実行できますが、送信操作は実行できません。
関数の引数や任意の代入操作で双方向チャネルを一方向チャネルに変換できますが、その逆はできません。
チャネルのまとめ#
channel
の一般的な異常のまとめは以下の通りです:
すでに閉じたchannel
を閉じると panic が発生します。
ワーカープール(goroutine プール)#
作業中に、起動する goroutine の数を指定できるworker pool
パターンを使用して、goroutine
の数を制御し、goroutine
のリークや暴走を防ぐことがよくあります。
簡易的なwork pool
のサンプルコードは以下の通りです:
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 3つのgoroutineを起動
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 5つのタスク
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 結果を出力
for a := 1; a <= 5; a++ {
<-results
}
}
select による多重化#
特定のシーンでは、複数のチャネルから同時にデータを受信する必要があります。チャネルがデータを受信する際、受信できるデータがない場合はブロックされます。以下のようにループを使用して実装することもできます:
for{
// ch1から値を受信しようとする
data, ok := <-ch1
// ch2から値を受信しようとする
data, ok := <-ch2
…
}
この方法は複数のチャネルから値を受信するニーズを実現できますが、実行性能は大幅に低下します。このようなシーンに対処するために、Go は内蔵のselect
キーワードを提供し、複数のチャネルの操作に同時に応答できます。
select
の使用は switch 文に似ており、一連の case 分岐とデフォルトの分岐があります。各 case はチャネルの通信(受信または送信)プロセスに対応します。select
は常に待機し、あるcase
の通信操作が完了すると、そのcase
分岐に対応する文を実行します。具体的な形式は以下の通りです:
select{
case <-ch1:
...
case data := <-ch2:
...
case ch3<-data:
...
default:
デフォルト操作
}
select
の使用を示す小さな例を見てみましょう:
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
select
文を使用すると、コードの可読性が向上します。
- 1 つまたは複数のチャネルの送信 / 受信操作を処理できます。
- 複数の
case
が同時に満たされる場合、select
はランダムに 1 つを選択します。 case
がないselect{}
は常に待機し、main 関数をブロックするのに使用できます。
並行安全とロック#
時には、Go コード内で複数のgoroutine
が同時にリソース(クリティカルセクション)を操作することがあり、この状況では競合状態
(データ競合)が発生する可能性があります。現実生活の例に例えると、十字路で各方向の車が競争している状況や、電車のトイレが車両内の人々によって競争されている状況です。
以下に例を示します:
var x int64
var wg sync.WaitGroup
func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
上記のコードでは、2 つのgoroutine
を起動して変数 x の値を加算していますが、これらのgoroutine
が x 変数にアクセスして変更する際にデータ競合が発生し、最終的な結果が期待通りでなくなる可能性があります。
ミューテックス#
ミューテックスは共有リソースへのアクセスを制御するための一般的な方法であり、同時に 1 つのgoroutine
のみが共有リソースにアクセスできることを保証します。Go 言語では、sync
パッケージのMutex
型を使用してミューテックスを実装します。上記のコードの問題を修正するためにミューテックスを使用します:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // ロックをかける
x = x + 1
lock.Unlock() // ロックを解除
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
ミューテックスを使用すると、同時に 1 つのgoroutine
のみがクリティカルセクションに入ることが保証され、他のgoroutine
はロックを待機します。ミューテックスが解放されると、待機しているgoroutine
がロックを取得してクリティカルセクションに入ることができます。複数のgoroutine
が同じロックを待機している場合、起床の戦略はランダムです。
読み書きミューテックス#
ミューテックスは完全に排他的ですが、多くの実際のシーンでは読み取りが多く書き込みが少ない場合があります。リソースを変更しない場合に並行してリソースを読み取る際には、ロックをかける必要はありません。このようなシーンでは、読み書きロックを使用する方が良い選択です。Go 言語では、sync
パッケージのRWMutex
型を使用して読み書きロックを実装します。
読み書きロックには 2 種類があります:読み取りロックと書き込みロックです。ある goroutine が読み取りロックを取得すると、他の goroutine が読み取りロックを取得することができますが、書き込みロックを取得する場合は待機します。あるgoroutine
が書き込みロックを取得すると、他のgoroutine
は読み取りロックでも書き込みロックでも待機します。
読み書きロックの例:
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
// lock.Lock() // ミューテックスをかける
rwlock.Lock() // 書き込みロックをかける
x = x + 1
time.Sleep(10 * time.Millisecond) // 読み取り操作が10ミリ秒かかると仮定
rwlock.Unlock() // 書き込みロックを解除
// lock.Unlock() // ミューテックスを解除
wg.Done()
}
func read() {
// lock.Lock() // ミューテックスをかける
rwlock.RLock() // 読み取りロックをかける
time.Sleep(time.Millisecond) // 読み取り操作が1ミリ秒かかると仮定
rwlock.RUnlock() // 読み取りロックを解除
// lock.Unlock() // ミューテックスを解除
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
読み書きロックは、読み取りが多く書き込みが少ないシーンに非常に適しています。読み取りと書き込みの操作の差があまりない場合、読み書きロックの利点は発揮されません。
sync.WaitGroup#
コード内で無理にtime.Sleep
を使用するのは適切ではありません。Go 言語では、sync.WaitGroup
を使用して並行タスクの同期を実現できます。 sync.WaitGroup
には以下のメソッドがあります:
メソッド名 | 機能 |
---|---|
(wg * WaitGroup) Add(delta int) | カウンター + delta |
(wg *WaitGroup) Done() | カウンター - 1 |
(wg *WaitGroup) Wait() | カウンターが 0 になるまでブロック |
sync.WaitGroup
は内部でカウンターを維持しており、カウンターの値は増減できます。たとえば、N 個の並行タスクを起動した場合、カウンターの値を N だけ増やします。各タスクが完了すると、Done()
メソッドを呼び出してカウンターを 1 減らします。Wait()
を呼び出して並行タスクの実行が完了するのを待ちます。カウンターの値が 0 になると、すべての並行タスクが完了したことを示します。
sync.WaitGroup
を使用して上記のコードを最適化します:
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 別のgoroutineを起動してhello関数を実行
fmt.Println("main goroutine done!")
wg.Wait()
}
sync.WaitGroup
は構造体であり、渡すときはポインタを渡す必要があります。
sync.Once#
前置き:これは進んだ知識です。
プログラミングの多くのシーンでは、高並行性の状況下で特定の操作が 1 回だけ実行されることを保証する必要があります。たとえば、設定ファイルを 1 回だけ読み込む、チャネルを 1 回だけ閉じるなどです。
Go 言語のsync
パッケージには、1 回だけ実行されるシーンに対する解決策としてsync.Once
が提供されています。
sync.Once
にはDo
メソッドが 1 つだけあり、そのシグネチャは以下の通りです:
func (o *Once) Do(f func()) {}
備考:実行する関数f
に引数が必要な場合は、クロージャと組み合わせて使用する必要があります。
設定ファイルの読み込み例#
高コストの初期化操作を実際に使用する時に遅延させることは良い実践です。なぜなら、変数を事前に初期化する(たとえば、init 関数内で初期化を完了する)ことはプログラムの起動時間を増加させ、実行中にその変数が使用されない可能性があるため、その初期化操作は必ずしも必要ではありません。以下の例を見てみましょう:
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Iconは複数のgoroutineから呼び出されるため、並行安全ではありません
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
複数のgoroutine
が並行して Icon 関数を呼び出すと、安全ではありません。現代のコンパイラや CPU は、各goroutine
が直列に一貫性を満たすことを保証しながら、メモリへのアクセス順序を自由に再配置する可能性があります。loadIcons 関数は以下のように再配置される可能性があります:
func loadIcons() {
icons = make(map[string]image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}
この場合、icons
が nil でないと判断しても、変数の初期化が完了したことを意味しません。このような状況を考慮すると、ミューテックスを追加して、icons
の初期化中に他のgoroutine
が操作しないようにすることが考えられますが、これにより性能問題が引き起こされます。
sync.Once
を使用して改造したサンプルコードは以下の通りです:
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Iconは並行安全です
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
並行安全なシングルトンパターン#
以下は、sync.Once
を利用して実装した並行安全なシングルトンパターンです:
package singleton
import (
"sync"
)
type singleton struct {}
var instance *singleton
var once sync.Once
func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
sync.Once
は内部にミューテックスとブール値を含んでおり、ミューテックスはブール値とデータの安全性を保証し、ブール値は初期化が完了したかどうかを記録します。このような設計により、初期化操作が並行安全であり、初期化操作が複数回実行されることはありません。
sync.Map#
Go 言語に内蔵されている map は並行安全ではありません。以下の例を見てみましょう:
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
上記のコードは少数のgoroutine
を起動した場合は問題ありませんが、並行性が高くなると、上記のコードを実行するとfatal error: concurrent map writes
エラーが発生します。
このようなシーンでは、map にロックをかけて並行安全を保証する必要があります。Go 言語のsync
パッケージには、開箱即用の並行安全な map であるsync.Map
が提供されています。開箱即用とは、内蔵の map のようにmake
関数で初期化する必要がなく、直接使用できることを意味します。また、sync.Map
にはStore
、Load
、LoadOrStore
、Delete
、Range
などの操作メソッドが内蔵されています。
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n)
value, _ := m.Load(key)
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
原子操作#
上記のコードでは、ロック操作を使用して同期を実現しました。ロックメカニズムの底層は原子操作に基づいており、通常は CPU 命令を介して実現されます。Go 言語の原子操作は、標準ライブラリsync/atomic
によって提供されます。
atomic パッケージ#
メソッド | 説明 |
---|---|
func LoadInt32(addr *int32) (val int32) | |
func LoadInt64(addr *int64) (val int64) | |
func LoadUint32(addr *uint32) (val uint32) | |
func LoadUint64(addr *uint64) (val uint64) | |
func LoadUintptr(addr *uintptr) (val uintptr) | |
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) | 読み取り操作 |
func StoreInt32(addr *int32, val int32) | |
func StoreInt64(addr *int64, val int64) | |
func StoreUint32(addr *uint32, val uint32) | |
func StoreUint64(addr *uint64, val uint64) | |
func StoreUintptr(addr *uintptr, val uintptr) | |
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) | 書き込み操作 |
func AddInt32(addr *int32, delta int32) (new int32) | |
func AddInt64(addr *int64, delta int64) (new int64) | |
func AddUint32(addr *uint32, delta uint32) (new uint32) | |
func AddUint64(addr *uint64, delta uint64) (new uint64) | |
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) | 修正操作 |
func SwapInt32(addr *int32, new int32) (old int32) | |
func SwapInt64(addr *int64, new int64) (old int64) | |
func SwapUint32(addr *uint32, new uint32) (old uint32) | |
func SwapUint64(addr *uint64, new uint64) (old uint64) | |
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) | |
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) | 交換操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) | |
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) | |
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) | |
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) | |
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) | |
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) | 比較と交換操作 |
例#
ミューテックスと原子操作の性能を比較する例を示します。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Counter interface {
Inc()
Load() int64
}
// 通常版
type CommonCounter struct {
counter int64
}
func (c CommonCounter) Inc() {
c.counter++
}
func (c CommonCounter) Load() int64 {
return c.counter
}
// ミューテックス版
type MutexCounter struct {
counter int64
lock sync.Mutex
}
func (m *MutexCounter) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}
func (m *MutexCounter) Load() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}
// 原子操作版
type AtomicCounter struct {
counter int64
}
func (a *AtomicCounter) Inc() {
atomic.AddInt64(&a.counter, 1)
}
func (a *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&a.counter)
}
func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}
func main() {
c1 := CommonCounter{} // 非並行安全
test(c1)
c2 := MutexCounter{} // ミューテックスを使用して並行安全を実現
test(&c2)
c3 := AtomicCounter{} // 並行安全でミューテックスよりも効率が高い
test(&c3)
}
atomic
パッケージは、低レベルの原子メモリアクションを提供し、同期アルゴリズムの実装に非常に役立ちます。これらの関数は、正しく使用されることを注意深く保証する必要があります。特定の特殊な低レベルのアプリケーションを除いて、チャネルや sync パッケージの関数 / 型を使用して同期を実現する方が良いです。
演習問題#
goroutine
とchannel
を使用して、int64 のランダム数の各桁の合計を計算するプログラムを実装します。- 1 つの
goroutine
を起動して int64 型のランダム数を生成し、jobChan
に送信します。 - 24 個の
goroutine
を起動してjobChan
からランダム数を取得し、各桁の合計を計算してresultChan
に送信します。 - メイン
goroutine
がresultChan
から結果を取得して端末に出力します。
- 1 つの
- ビジネスコードの実行性能を保証するために、以前に書いたログライブラリを非同期記録ログ方式に改写します。