2.7 マルチスレッド

Goを21世紀のC言語だという人もいます。Go言語は設計が簡単で、21世紀で最も重要なのはマルチスレッドだからです。Goは言語レベルでマルチスレッドをサポートしています。

goroutine

goroutineはGoのマルチスレッドのコアです。goroutineは実は最初から最後までスレッドです。しかしスレッドよりも小さく、十数個のgoroutineは低レイヤーで5,6個のスレッドを実現しているだけです。Go言語の内部ではこれらのgoroutineの間ではメモリの共有を実現しています。goroutineを実行するには非常に少ないスタックメモリ(大体4~5KBです。)を必要とするだけです。当然対応するデータによって伸縮しますが、まさにこのためいくつものマルチスレッドタスクを実行することができます。goroutineはthreadに比べより使いやすく、効果的で、便利です。

goroutineはGoのruntime管理を利用したスレッドコントローラです。goroutineはgoキーワードによって実現します。実は単なる普通の関数です。

go hello(a, b, c)

キーワードgoを通じてgoroutineを起動します。例を見てみましょう。

package main

import (
    "fmt"
    "runtime"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        runtime.Gosched()
        fmt.Println(s)
    }
}

func main() {
    go say("world") //新しいGoroutinesを実行する。
    say("hello") //現在のGoroutines実行
}

// 上のプログラムを実行すると以下のように出力されます:
// hello
// world
// hello
// world
// hello
// world
// hello
// world
// hello

goキーワードで非常に簡単にマルチスレッドプログラミングを実現することがお分かりいただけるかと思います。 上の複数のgoroutineは同じプロセスで実行されています。メモリのデータを共有しますが、デザイン上共有を利用して通信を行ったりせず、通信によって共有を行うようにしましょう。

runtime.Gosched()ではCPUに時間を他の人に受け渡します。次にある段階で継続してこのgoroutineを実行します。

デフォルトでは、ディスパッチャはプロセスを使うのみで、マルチスレッドを実現するだけです。マルチコアプロセッサのマルチスレッドを実現したい場合は、我々のプログラムでruntime.GOMAXPROCS(n)をコールすることでディスパッチャに同時に複数のプロセスを使用させる必要があります。GOMAXPROCSは同時に実行するロジックコードのシステムプロセスの最大数を設定し、前の設定を返します。もしn < 1であった場合、現在の設定は変更されません。Goの新しいバージョンでディスパッチャが修正されれば、これは削除されるでしょう。Robによるマルチスレッドの開発についてはこちらに文章があります。http://concur.rspace.googlecode.com/hg/talk/concur.html#landing-slide

channels

goroutineは同じアドレス空間で実行されます。そのため、共有されたメモリへのアクセスはかならず同期されていなければなりません。では、goroutineの間ではどのようにしてデータの通信を行うのでしょうか。Goはチャネルというとても良い通信機構を提供しています。チャネルはUnix shellとの双方向パイプを作成します。これを通して値を送信したり受信することができます。これらの値は特定の型のみが許容されます。チャネル型。channelを定義した場合チャネルに送信する値の型も定義しなければなりません。ご注意ください。かならずmakeを使ってchannelを作成します。

ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{})

channelは<-演算子を使ってデータを送ったり受けたりします。

ch <- v    // vをchannel chに送る。
v := <-ch  // chの中からデータを受け取り、vに代入する。

これを私達の例の中に当てはめてみましょう:

package main

import "fmt"

func sum(a []int, c chan int) {
    total := 0
    for _, v := range a {
        total += v
    }
    c <- total  // send total to c
}

func main() {
    a := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(a[:len(a)/2], c)
    go sum(a[len(a)/2:], c)
    x, y := <-c, <-c  // receive from c

    fmt.Println(x, y, x + y)
}

デフォルトでは、channelがやり取りするデータはブロックされています。もう片方が準備できていなければ、Goroutinesの同期はもっと簡単になります。lockを表示する必要はありません。いわゆるブロックとは、もし(value := <-ch)を読み取った場合、これはブロックされます。データを受け取った段階で(ch<-5)を送信するいずれのものもデータが読み込まれるまでブロックされます。バッファリングの無いchannelは複数のgoroutineの間で同期を行う非常に優れたツールです。

Buffered Channels

上ではデフォルトでバッファリング型の無いchannelをご紹介しました。しかし、Goはchannelのバッファリングの大小も指定することを許しています。とても簡単です。つまりchannelはいくつもの要素を保存することができるのです。ch:= make(chan bool, 4)は4つのbool型の要素を持てるchannelを作成します。このchannelの中で、前の4つの要素はブロックされずに入力することができます。5番目の要素が入力された場合、コードはブロックされ、その他のgoroutineがchannelから要素を取り出すまで空間を退避します。

ch := make(chan type, value)

value == 0 ! バッファリング無し(ブロック)
value > 0 ! バッファリング(ブロック無し、value個の要素まで)

下の例をご覧ください。ローカルでテストしてみることができます。対応するvalue値を変更してください

package main

import "fmt"

func main() {
    c := make(chan int, 2)//2を1に修正するとエラーが発生します。2を3に修正すると正常に実行されます。
    c <- 1
    c <- 2
    fmt.Println(<-c)
    fmt.Println(<-c)
}

RangeとClose

上の例では、2回cの値を読み込む必要がありました。これではあまり便利ではありません。Goはこの点を考慮し、rangeによってsliceやmapを操作するのと同じ感覚でバッファリング型のchannelを操作することができます。下の例をご覧ください。

package main

import (
    "fmt"
)

func fibonacci(n int, c chan int) {
    x, y := 1, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x + y
    }
    close(c)
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    for i := range c {
        fmt.Println(i)
    }
}

for i := range cでこのchannelがクローズを明示されるまで連続してchannelの中のデータを読み込むことができます。上のコードでchannelのクローズが明示されているのが確認できるかと思います。生産者はcloseビルトイン関数によってchannelを閉じます。channelを閉じた後はいかなるデータも送信することはできません。消費側はv, ok := <-chという式でchannelがすでに閉じられているかテストすることができます。もしokにfalseが返ってきたら、channelはすでにどのようなデータも無く、閉じられているということになります。

生産者の方でchannelが閉じられる事に注意してください。消費側ではありません。これは容易にpanicを引き起こします。

また、channelはファイルのようなものでないことにも注意してください。頻繁に閉じる必要はありません。何のデータも送ることが無い場合またはrangeループを終了させたい場合などで結構です。

Select

ここではひとつだけのchannelがある状況についてご紹介しました。では複数のchannelが存在した場合、どのように操作すべきでしょうか。Goではキーワードselectを提供しています。selectを通して、channel上のデータを監視することができます。

selectはデフォルトでブロックされます。channelの中でやりとりされるデータを監視する時のみ実行します。複数のchannelの準備が整った時に、selectはランダムにひとつ選択し、実行します。

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 1, 1
    for {
        select {
        case c <- x:
            x, y = y, x + y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

selectの中にもdefault文があります。selectは実はswitchの機能によくにています。defaultは監視しているchannelがどれも準備が整っていない時に、デフォルトで実行されます。(selectはchannelを待ってブロックしません。)

select {
case i := <-c:
    // use i
default:
    // cがブロックされた時にここが実行されます。
}

タイムアウト

ときどきgoroutineがブロックされる状況にでくわします。ではプログラム全体がブロックされる状況をどのように回避するのでしょうか?selectを使ってタイムアウトを設定することができます。下のような方法で実現します:

func main() {
    c := make(chan int)
    o := make(chan bool)
    go func() {
        for {
            select {
                case v := <- c:
                    println(v)
                case <- time.After(5 * time.Second):
                    println("timeout")
                    o <- true
                    break
            }
        }
    }()
    <- o
}

runtime goroutine

runtimeパッケージにはgoroutineを処理するいくつかの関数が含まれます:

  • Goexit

    事前に実行されたgoroutineから抜けます。ただし、defer関数は継続してコールされます。

  • Gosched

    事前のgoroutineの実行権限をジェネレートします。ディスパッチャが他の待機中のタスクの実行を予定し、次のある時点でこの位置から実行を復元します。

  • NumCPU

    CPUのコア数を返します。

  • NumGoroutine

    現在実行しているgoroutineの総数を返します。

  • GOMAXPROCS

    実行できるCPUコア数の最大値を設定し、前のコア数を返します。

results matching ""

    No results matching ""