システムコールにみるGo言語のnetパッケージの実装

netパッケージのコードを読む機会があったのでメモ。TCPのエコーサーバーを実行して、呼ばれているシステムコールとその引数を確認した。Go言語がシンプルなインターフェースを提供している裏側で、ノンブロッキングIOやIO多重化を駆使している様子がわかって面白かった。

動作確認用プログラムと実行環境

動作確認用に以下のエコーサーバーとクライアントを書いた。あくまで実験用。

TCP echo server/client to be used with strace

今回はこのエコーサーバーをstraceを通して実行することで、サーバー実行時に呼ばれるシステムコールを確認した。

実行環境はgolang:1.10イメージベースのdockerコンテナ。IPv6をenableにしている。また、コンテナ実行時に--security-opt=apparmor=unconfined --cap-add=SYS_PTRACEオプションを与えてstraceが動くようにしている。

実行結果

以下のようにエコーサーバーを実行した後、別シェルでgo run echo_client.goを実行した。

# go build echo_server.go && strace -e 'trace=!pselect6,futex,sched_yield' ./echo_server
// net.Listen()を呼ぶ箇所までスキップ
...
write(1, "\n===== net.Listen() =====\n", 26
===== net.Listen() =====
) = 26
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_TCP) = 3
close(3)                                = 0
socket(AF_INET6, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_TCP) = 3
setsockopt(3, SOL_IPV6, IPV6_V6ONLY, [1], 4) = 0
bind(3, {sa_family=AF_INET6, sin6_port=htons(0), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
socket(AF_INET6, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_TCP) = 5
setsockopt(5, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0
bind(5, {sa_family=AF_INET6, sin6_port=htons(0), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
close(5)                                = 0
close(3)                                = 0
socket(AF_INET6, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_IP) = 3
setsockopt(3, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0
setsockopt(3, SOL_SOCKET, SO_BROADCAST, [1], 4) = 0
setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(3, {sa_family=AF_INET6, sin6_port=htons(8080), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(3, 128)                          = 0
epoll_ctl(4, EPOLL_CTL_ADD, 3, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=4234977024, u64=140591399476992}}) = 0
getsockname(3, {sa_family=AF_INET6, sin6_port=htons(8080), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0
write(1, "\n===== ln.Accept() =====\n", 25
===== ln.Accept() =====
) = 25
accept4(3, 0xc420057bd0, [112], SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(4, [], 128, 0)               = 0
epoll_wait(4, [{EPOLLIN, {u32=4234977024, u64=140591399476992}}], 128, -1) = 1
accept4(3, {sa_family=AF_INET6, sin6_port=htons(37268), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28], SOCK_CLOEXEC|SOCK_NONBLOCK) = 5
epoll_ctl(4, EPOLL_CTL_ADD, 5, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=4234976816, u64=140591399476784}}) = 0
getsockname(5, {sa_family=AF_INET6, sin6_port=htons(8080), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0
setsockopt(5, SOL_TCP, TCP_NODELAY, [1], 4) = 0
write(1, "\n===== io.Copy() =====\n", 23
===== io.Copy() =====
) = 23
read(5, 0xc4200b6000, 32768)            = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(4, [{EPOLLOUT, {u32=4234976816, u64=140591399476784}}], 128, 0) = 1
epoll_wait(4, [{EPOLLIN|EPOLLOUT, {u32=4234976816, u64=140591399476784}}], 128, -1) = 1
read(5, "test", 32768)                  = 4
write(5, "test", 4)                     = 4
read(5, 0xc4200b6000, 32768)            = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(4, [], 128, 0)               = 0
epoll_wait(4, [{EPOLLIN|EPOLLOUT|EPOLLRDHUP, {u32=4234976816, u64=140591399476784}}], 128, -1) = 1
read(5, "", 32768)                      = 0
write(1, "\n===== conn.Close() =====\n", 26
===== conn.Close() =====
) = 26
epoll_ctl(4, EPOLL_CTL_DEL, 5, 0xc420057d34) = 0
close(5)                                = 0
write(1, "\n===== ln.Accept() =====\n", 25
===== ln.Accept() =====
) = 25
accept4(3, 0xc420057bd0, [112], SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(4, [], 128, 0)               = 0
epoll_wait(4,

ここからはnet.Listen()、ln.Accept()、conn.Read()、conn.Write()、conn.Close()について、呼ばれているシステムコールを確認していく。

net.Listen()が呼ぶシステムコール

以下のシステムコールを呼んでいた。大別すると、IPv4/IPv6/IPV4 mapped IPv6サポートの確認、ソケットの準備、epollの設定をしている。

socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_TCP) = 3
close(3)                                = 0
socket(AF_INET6, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_TCP) = 3
setsockopt(3, SOL_IPV6, IPV6_V6ONLY, [1], 4) = 0
bind(3, {sa_family=AF_INET6, sin6_port=htons(0), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
socket(AF_INET6, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_TCP) = 5
setsockopt(5, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0
bind(5, {sa_family=AF_INET6, sin6_port=htons(0), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
close(5)                                = 0
close(3)                                = 0
socket(AF_INET6, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_IP) = 3
setsockopt(3, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0
setsockopt(3, SOL_SOCKET, SO_BROADCAST, [1], 4) = 0
setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(3, {sa_family=AF_INET6, sin6_port=htons(8080), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(3, 128)                          = 0
epoll_ctl(4, EPOLL_CTL_ADD, 3, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=4234977024, u64=140591399476992}}) = 0
getsockname(3, {sa_family=AF_INET6, sin6_port=htons(8080), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0

前半では、ソケットを作って、bindして、すぐにcloseする作業を繰り返している。最初は謎だったけど、ここでシステムのIPv4/IPv6/IPV4 mapped IPv6サポート状況を確認しているらしい。コード上はこのあたり。確認結果はメモリ上にキャッシュされているので、net.Listen()のたびに確認するわけではない。

後半が本筋で、socket()、bind()、listen()を呼んでいる。コード上はこのあたり。定番の手順だけど、socket作成時のオプションにSOCK_NONBLOCKが付いているので、ノンブロッキングなソケットが作成されていることがわかる。

また、socket作成後にsetsockopt()を3回呼び、IPV6_V6ONLY、SO_BROADCAST、SO_REUSEADDRオプションを設定している。最初の2つはソケット作成時にTCP/UDP/Listen/Dial問わず設定している。最後のSO_REUSEADDRは、bindしたいポートについて、過去の接続がまだ残っていてもbindできるようにするオプション。TCPサーバーではセットしておくのが普通らしい。netパッケージでもTCPソケットがlistenするときに設定している。

最後のepoll_ctl()はIO多重化のための設定。ソケットとイベントをepollインスタンスに登録している。コード上はこのあたり。read/writeイベントに加えて、相手ソケットのcloseイベントも登録している。ちなみにepollインスタンスはプロセス全体で共有されていて、既に作成済みなので、ここでは作成されていない。straceの結果を遡ってみると、以下のように作成されていた。

openat(AT_FDCWD, "/proc/sys/net/core/somaxconn", O_RDONLY|O_CLOEXEC) = 3
epoll_create1(EPOLL_CLOEXEC)            = 4
epoll_ctl(4, EPOLL_CTL_ADD, 3, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=4234977024, u64=140591399476992}}) = 0

netパッケージの初期化中に作成している。Go 1.9から、file IOでもnetwork pollerを使うようになったみたい。

ln.Accept()が呼ぶシステムコール

以下のシステムコールを呼んでいた。接続の待ち受けと、確立した接続の初期設定をしている。

accept4(3, 0xc420057bd0, [112], SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(4, [], 128, 0)               = 0
epoll_wait(4, [{EPOLLIN, {u32=4234977024, u64=140591399476992}}], 128, -1) = 1
accept4(3, {sa_family=AF_INET6, sin6_port=htons(37268), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28], SOCK_CLOEXEC|SOCK_NONBLOCK) = 5
epoll_ctl(4, EPOLL_CTL_ADD, 5, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=4234976816, u64=140591399476784}}) = 0
getsockname(5, {sa_family=AF_INET6, sin6_port=htons(8080), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0
setsockopt(5, SOL_TCP, TCP_NODELAY, [1], 4) = 0

前半では、スレッドレベルのブロックをできるだけ避けつつ接続を待ち受けている。コード上はこのあたり。最初はとりあえずaccept4()を呼んで、確立した接続がないか確認している。今回は接続がなかったようなので、goルーチンレベルで一度ブロックしている。コード上はこのあたり。ちなみに少し紛らわしいけど、accept4()のSOCK_NONBLOCKオプションは、接続確立したソケットをノンブロッキングにするためのもの。リッスンソケットはノンブロッキングになるよう既に設定済み。

次に、スレッドが実行可能なgoルーチンを探すところで、epoll_wait()を2回呼んでいる。1回目はノンブロッキング、2回目はブロッキング。コード上はこのあたり。いきなり2回呼ぶわけではなく、実行キューやGCや他のスレッドのキューの様子を見て、余裕があれば呼んでいるっぽい。特に2回目のepoll_wait()はブロッキングになっているので、他の処理が全くない時にだけ呼んでいる様子。

2回目のepoll_wait()は2番目の引数にEPOLLINイベントが入っているので、readできるようになったことがわかる。Readできるようになると、ここここのコードが呼ばれてgoルーチンがアンブロックされる。アンブロックするgoルーチンはイベント内のデータから辿れるようになっている。

conn.Read()、conn.Write()が呼ぶシステムコール

io.Copy()を通して以下のシステムコールを呼んでいた。read()で相手からのパケットを読み、write()で相手にパケットを送り、最後にread()でEOFを受け取っている。

read(5, 0xc4200b6000, 32768)            = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(4, [{EPOLLOUT, {u32=4234976816, u64=140591399476784}}], 128, 0) = 1
epoll_wait(4, [{EPOLLIN|EPOLLOUT, {u32=4234976816, u64=140591399476784}}], 128, -1) = 1
read(5, "test", 32768)                  = 4
write(5, "test", 4)                     = 4
read(5, 0xc4200b6000, 32768)            = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(4, [], 128, 0)               = 0
epoll_wait(4, [{EPOLLIN|EPOLLOUT|EPOLLRDHUP, {u32=4234976816, u64=140591399476784}}], 128, -1) = 1
read(5, "", 32768)                      = 0

io.Copy()はEOFが返るまで、conn.Read()からの読み込みとconn.Write()への書き込みを繰り返す。conn.Read()はノンブロッキングにread()を呼び、パケットが来てなければepoll_wait()で待つ。コード上はこのあたり。Accept()と似た処理になっていて、呼ばれるシステムコールも似ている。

Accept()と比べて異なるのは、epoll_wait()の結果。見ての通り、どちらのepoll_wait()にもEPOLLOUTイベントが返っている。これは、fdが接続済みソケットを指していて、ソケットが書込み可能な状態になっているためだと思う。

最初見たときは、1回目のepoll_wait()がEPOLLOUTイベントを返しているので、そこでgoルーチンがアンブロックされるのかと思った。それなのに2回目のepoll_wait()が呼ばれていて不思議だったが、read待ちのgoルーチンとwrite待ちのgoルーチンは別々に管理されるため、EPOLLOUTイベントではread待ちのgoルーチンはアンブロックされないようだった。

conn.Write()もAccept()と似た処理だけど、既に書込み可能な状態なので、一発目のwrite()で成功している。コード上はこのあたり

2回目のconn.Read()は1回目のconn.Read()と大体同じ。今度は相手がソケットをclose()したので、2回目のepoll_wait()でEPOLLRDHUPイベントが返っている。その後のread()でもEOFが返っている。

conn.Close()が呼ぶシステムコール

以下のシステムコールを呼んでいた。

epoll_ctl(4, EPOLL_CTL_DEL, 5, 0xc420057d34) = 0
close(5)                                = 0

比較的単純で、epollの登録解除とソケットのclose()をしているのみ。コード上はこのあたり

まとめ

TCPのエコーサーバー実行時に呼んでいるシステムコールを確認して、ノンブロッキングIOとIO多重化を多用していることがわかった。Goはスケールするプログラムをシンプルに書けるイメージがあるけど、今回はその裏側にある泥臭い処理の一部を垣間見られた気がする。

Goでpingコマンドを書いてみた

最近読んだUNIX Network Programmingに触発されてpingコマンドを書いてみた。

ping command in golang, just for fun

本家pingと違って、統計情報とかは保存していない。MessageのPack/Unpack周りの実装はnet/dnsmsg.goをかなり参考にしている。

Linux環境で実行してみる。Raw socketを使っているので、実行にはroot権限が必要。

# go run ping.go golang.org
23 bytes from 216.58.197.177: seq=1, ttl=61, rtt=10.816086ms
23 bytes from 172.217.25.241: seq=2, ttl=61, rtt=10.120621ms
23 bytes from 216.58.197.177: seq=3, ttl=61, rtt=8.431578ms

一応tcpdumpを確認してみる。

# tcpdump -v icmp
tcpdump: listening on eth0, link-type EN10MB (Ethernet), capture size 262144 bytes
10:02:34.835872 IP (tos 0x0, ttl 64, id 23326, offset 0, flags [DF], proto ICMP (1), length 43)
    6f3d0f1790c6 > nrt12s02-in-f177.1e100.net: ICMP echo request, id 1531, seq 1, length 23
10:02:34.845779 IP (tos 0x0, ttl 61, id 17225, offset 0, flags [DF], proto ICMP (1), length 43)
    nrt12s02-in-f177.1e100.net > 6f3d0f1790c6: ICMP echo reply, id 1531, seq 1, length 23

DFビットがONになっているのが謎だったけど、これはpath mtu discoveryをしているためらしい。試しにnet.ipv4.ip_no_pmtu_disc=1 にしたらOFFになった。TCPじゃなくてもdiscoveryするんだね。

Go言語からkubernetes APIを呼び出すクライアント「client-go」の紹介

これはKubernetes2 Advent Calendar 2017 25日目の記事です。この記事ではclient-goというライブラリを使ってgo言語からkubernetesのAPIを呼び出す方法を紹介します。

とりあえず使ってみるところから、任意のkubernetesクラスタにつなぐ方法、service accountを利用した認証方法、テストの書き方まで紹介します。

Client-goとは

Kubernetes APIを叩くためのGo製ライブラリです。おなじみのkubectlコマンドでも使われています。僕の関わっているサービスでは、kubernetes外のサーバーやkubernetes内部のコンテナからkubernetesを操作したいときに使っています。

インストールは、以下のコマンドでできます。

$ go get k8s.io/client-go/...

インストールに関する詳細はドキュメントを参照して下さい。ちゃんと使うならバージョン管理してね、といったことが書かれています。

まずは叩いてみる

以下の例ではPod一覧を取得しています。なお、実行にはkubectlが必要です(後述しますが、client-go利用の際にkubectlが常に必要になるわけではありません)。

まずはnewClient()でクライアントを生成しています。その際の認証情報は、~/.kube/configファイルから取得しています。これはkubectlが作成するファイルで、接続先サーバーや認証情報が格納されています。

その後、client.CoreV1().Pods("").List()でPod一覧を取得しています。Pods("")の引数ではnamespaceを指定することができ、空文字列にすると、全namespace指定と同等になります。

実行すると、Pod一覧が出力されます。

$ go run get_started.go
kube-addon-manager-minikube
kube-dns-6fc954457d-f58rp
kubernetes-dashboard-jcjjn

接続先kubernetesクラスタを指定する

先程の例では、kubectlの作成したクラスタ/認証情報を使用しました。kubectlは複数のクラスタが存在する場合でもcontextを切り替えて対応できますが、先程はkubectl config current-contextで得られるcontextを自動的に使うようになっていました。

そこで今度は、contextを指定して任意のクラスタに接続できるようにしてみます。こちらも、実行にはkubectlが必要です。

主に変更したのはnewClient()です。clientcmd.ConfigOverrides{}を使用してcontextを指定しています。試したことはないですが接続先サーバーや認証情報の上書きもできるようです。clientcmd.NewDefaultClientConfigLoadingRules()では、クラスタ/認証情報のロード方法を指定しています。デフォルトでは、~/.kube/configの情報がロードされます。これらの情報を元にconfigとclientを作成しています。

実行の際は、引数でcontextを指定します。kubectl config get-contextsで得られるcontext名が指定できます。

$ go run choose_k8s_cluster.go minikube
kube-addon-manager-minikube
kube-dns-6fc954457d-f58rp
kubernetes-dashboard-jcjjn

Kubernetes内部のPodからAPIを叩く

ここまでの例では、kubectlの作成したクラスタ/認証情報を使用してクライアントを作成していました。kubectlが利用できる環境ではここまでの方法でも良いですが、kubernetes内部のPodからAPIを叩くような場合には、kubectlがインストールされていないことがあります。

そのような場合、kubernetesがPodに対して付与するservice accountを使うと便利です。Service accountについてはこのドキュメントが詳しいです。

主に変更したのはnewClient()です。rest.InClusterConfig()でservice accountを利用するための設定をしています。/var/run/secrets/kubernetes.io/serviceaccount/以下のトークンや証明書が使われます。

Pod内のコンテナにログインして実行すると、~/.kube/configのない環境でもPod一覧を取得できることがわかります。 (もしパッケージがない旨のエラーが出たら、client-goをインストールして下さい)

root@golang:/go# ls ~/.kube
ls: cannot access '/root/.kube': No such file or directory
root@golang:/go# go run in_cluster_client.go
golang
kube-addon-manager-minikube
kube-dns-6fc954457d-f58rp
kubernetes-dashboard-jcjjn

テストを書く

最後にテストを書いてみます。client-goに含まれるfakeパッケージを使用すると、テスト用のデータを予め仕込んだ上で各種APIを叩くことができます。

newClient()でのクライアント作成はfake.NewSimpleClientset()を呼ぶだけになりました。addTestData()でテスト用のPodを作成していますが、方法は非テストコードでPodを作成する場合と同じです。今回は省略しましたが、ここでPodの状態や各種ポリシーを設定することもできます。

実行すると、作成したテスト用Podが出力されることがわかります。

$ go run normal_case_test.go
test

今度は、kubernetes APIがエラーを返す場合のテストを書いてみます。

setServerError()API呼び出し時にエラーを返すようにしています。テスト用クライアントは内部にReactionChainと呼ばれるハンドラリストをもっていて、先頭のハンドラから順番にリクエストを処理していきます。上記のコードでは常にエラーを返すハンドラを先頭に挿入しているので、API呼び出しがエラーを返すようになります。ちなみに、PrependReactor()の第一引数でverb、第二引数でresourceを指定できるので、第一引数をlist、第二引数をpodsとすれば、Pod一覧取得のときだけエラーを返すようになります。

実行すると、Pod一覧を取得するAPIがエラーを返して終了します。

$ go run error_case_test.go
2017/12/22 13:03:08 server error
exit status 1

まとめ

この記事ではclient-goの使い方について簡単に紹介しました。ドキュメントはそれほど充実していないですが、kubectlで使われているだけあって、やりたいことは大体できる印象です。Kubernetes APIを使うときはぜひ利用を検討してみて下さい。