入社して2年が経ったけど転職・・・しません
CyberAgent に入社して2年が経ちました。今日から3年目です。 入社してからは CIU というインフラ横断の組織で開発を主にしていました (入社した時はまだ SIA って組織名でしたけどね)
入社してから何をしてたか
AKE というコンテナ基盤を改修、ユーザさんのサポートをしていました。 そういったことを率先していたからかプロダクトオーナーの権限を一部委譲していただけたりもしました。
それから AKE のリプレイスということで AKE v2 の技術調査、一部コンポーネントの設計なんかを色々やらせてもらいました。 詳しくはこの間の CA Developers Conference という社外向けの発表を見ていただけると。(僕もライブQ&Aに出てます)
あとは ML 基盤の開発なんかも始まって2軸体制になってコンテキストスイッチがなかなか大変でした。今も大変ですけど。 ML 基盤の話はこちらを見ていただけるのが早いです。
という感じでプライベートクラウドだからこそできる開発を存分に楽しませていただきました。
これからは開発のピークは超えてきているので運用も本格的に始まってくるので SRE や SA 的立ち回りもしっかりできてくるといいですね。 開発してはいおしまいというのは昨今の潮流にも反しますので。
ちなみに外部登壇とかだとソフトウェアエンジニアって名乗ってます。理由としてはインフラだけ見てるわけではないし SRE 的立ち位置でもないし、でもコードは書いてるし運用もしてるなぁということで良い落とし所がソフトウェアエンジニアって肩書きだったからですね。CIU のこと知らない人に説明するとき苦労してます。
発表系
正直外部発表とか他の人に批判されそうなので乗り気ではなかったりするんですが、せっかく頑張ったことを発表させてくれる機会があるならということで色々やってました。 これからもちょくちょくできたらいいですね。
CA 1day Youth Bootcamp CIU Kubernetes https://t.co/otOjiLdgFz
— katsuya kawabe 🏂 (@KKawabe108) 2021年11月25日
CA 1day Youth Bootcamp というイベントで使用した Kubernetes の基礎をなんとなーく理解するためのハンズオン資料です。
口頭でカバーした説明も多いですが、できるだけ参加者の方のお手元で後日振り返りがしやすいように作りました
OSS 系
Cluster API を AKE で使ってるのですこーしだけコントリビュートさせてもらったりしました。 Cluster API は難しいプロダクトなんですが、これからも継続的に改善のお手伝いをできたらいいなって感じです。 ぶっちゃけ Cluster API Provider OpenStack Provider は AKE で独自実装して Origin でも使えそうなコードが結構あるんですが腰が上がらずにいます。やる気がほしい
これから
興味あることをやりつつしっかり会社の事業とかに貢献できたらいいなって思います。 この会社いろんな人やプロダクトがあるので見てて飽きません。
ということで CIU では人を募集していますので上記のプロダクトに関わりたいという方は是非応募お願い致します。
Custom Controller の開発で使えそうなコード集 #Kubernetes2 Advent Calendar 2020 16日目
この記事は Kubernetes2 Advent Calendar 2020qiita.com の16日目の記事です。
Custom Controller で よく使う、使えそう なコードの断片をラフに紹介してみようと思います。
想定する環境
- Kubebuilder v2
- go 1.14
今回実装した CRD の example
apiVersion: nginx.k8s-2020.advent.calendar/v1 kind: Nginx metadata: name: nginx-sample spec: name: nginx-1 replices: 1
Custom Controller のロジック
- nginxリソースが作成されると nginx の Deployment と Service を作成
- nginxリソースが削除されると nginx の Deployment と Service を削除
サンプルなので非常に簡単なものになっています。
以降のコードではこちらの CR をアプライ、Controller が処理する前提で話を進めます。
Delete Reconcile の開始
if !nginx.DeletionTimestamp.IsZero() { return r.reconcileDelete(ctx, logger, nginx) }
Reconcile する Object の Timestamp を判定して削除処理に分岐させるコード。 reconcileDelete 関数は各自で実装しましょう。
NotFound を判別するコード
import ( apierrors "k8s.io/apimachinery/pkg/api/errors" ) err := r.Get(ctx, req.NamespacedName, nginx) if err != nil { if apierrors.IsNotFound(err) { return reconcile.Result{}, nil } return reconcile.Result{}, err }
ok
みたいな別の値で存在可否を返さないこともあるのでこういう書き方でチェックできます。
Patch Helper による Patch
patchHelper, err := patch.NewHelper(nginx, r) if err != nil { return ctrl.Result{}, err } defer func() { if err := patchHelper.Patch(ctx, nginx); err != nil { if reterr == nil { reterr = errors.Wrapf(err, "error patching Nginx %s/%s", nginx.Namespace, nginx.Name) } } }()
sigs.k8s.io/cluster-api/util/patch
パッケージを使用して、Reconcile 内で副作用的にリソースを書き換えた場合、必ず Patch 処理で更新を行うコード。
Before な Object のパラメータを Controller の内部で直接書き換えるのはどうなんだろうと思いますが、後述する Finalizer の実装などと相性が良いです。
Event Recorder で 処理内容を Event Resource として保存
Event Resource を用いることで、Controller の処理結果を kubectl get events
で確認できるようにもなります。
まず、Reconciler に record.EventRecorder
フィールドを追加しましょう。
import ( ... "k8s.io/client-go/tools/record" ) // NginxReconciler reconciles a Nginx object type NginxReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme Recorder record.EventRecorder }
Reconciler の初期化で Recorder の方も初期化します。
if err = (&controllers.NginxReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Nginx"), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("nginx-controller"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Nginx") os.Exit(1) }
Event を記録する場合は次のようにします
r.Recorder.Eventf(nginx, corev1.EventTypeNormal, "Reconcile", "Reconcile Nginx resource %q", nginx.Name)
EventType は他にもあるので用途に応じて使い分けましょう。
Controller のログにも同時に出力されるようになります
2020-12-15T20:59:28.936+0900 DEBUG controller-runtime.manager.events Normal {"object": {"kind":"Nginx","namespace":"default","name":"nginx-sample","uid":"7787eec9-ef85-4d56-ad75-dd4807b077d0","apiVersion":"nginx.k8s-2020.advent.calendar/v1","resourceVersion":"21597"}, "reason": "Reconcile", "message": "Reconcile Nginx resource \"nginx-sample\""}
kubectl get events
でも確認できます
2m8s Normal Reconcile nginx/nginx-sample Reconcile Nginx resource "nginx-sample"
Finalizer の付与と削除
Finalizer はざっくり説明すると、その Object が削除される前に行う処理を定義するものです。 CR に従属する別のリソースを CR が削除される前に削除するというのを保証するために使われます。
Custom Controller ではこの Finalizer キーを Object に付与、削除する処理を書くことでこれらを実現することができます。
Finalizer の付与
import ( "context" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ... ) ... controllerutil.AddFinalizer(nginx, nginxv1.NginxFinalizer)
kubebuidler books には Object の metadata フィールドにキーを append するように書いていますが、こちらの方が楽に書けます。
先ほどの patchHelper
を併用することで、キーを付与後に行う更新忘れもなくなります。
Finalizer の削除
CR の削除の前処理が終わったら最後は Finalizer を削除しましょう。
こちらの RemoveFinalizer
は defer
で呼ばないように注意しましょう。他の削除ロジックでエラーを返した場合も実行されて、意図せず CR が削除されてしまいます。
そうなった場合、従属する Object を kubectl
で手で削除していかなくてはいけません。
func (r *NginxReconciler) reconcileDelete(ctx context.Context, logger logr.Logger, nginx *nginxv1.Nginx) (ctrl.Result, error) { // Delete some objects controllerutil.RemoveFinalizer(nginx, nginxv1.NginxFinalizer) return ctrl.Result{}, nil }
OwnerRefrence 周りのTips
書こう思ったらすでに某書籍でしっかり書かれていたっぽいのでリンクだけ貼っておきます
ObjectReference で外部リソースへの依存を表現
import (corev1 "k8s.io/api/core/v1") type NginxSpec struct { HogeRef *corev1.ObjectReference }
これによって CR のフィールドに他リソースへの参照を持たせることができます。
apiVersion: nginx.k8s-2020.advent.calendar/v1 kind: Nginx metadata: name: nginx-sample spec: name: nginx-1 replices: 1 hogeRef: apiVersion: hoge.io kind: Hoge name: hogefuga
名前だけでも良い場合もあると思いますが、kind
、apiVersion
によってコードによる参照がしやすくなります。
具体的には以下のような感じ
import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "sigs.k8s.io/controller-runtime/pkg/client" ) if nginx.Spec.HogeRef != nil { obj, err := GetExternalResource(ctx, r.Client, nginx.Spec.HogeRef, nginx.Namespace) // do something } func GetExternalResource(ctx context.Context, c client.Client, ref *corev1.ObjectReference, namespace string) (*unstructured.Unstructured, error) { obj := new(unstructured.Unstructured) obj.SetAPIVersion(ref.APIVersion) obj.SetKind(ref.Kind) obj.SetName(ref.Name) key := client.ObjectKey{Name: obj.GetName(), Namespace: namespace} if err := c.Get(ctx, key, obj); err != nil { return nil, errors.Wrapf(err, "failed to retrieve %s external object %q/%q", obj.GetKind(), key.Namespace, key.Name) } return obj, nil }
注意しなくてはいけないのは、obj
は *Unstructured
で返してるので、Goの構造体としてフィールドにメンバ変数を介してアクセスしたい場合はそれぞれの Object にマッピングする必要があります。
その場合は runtime
パッケージの FromUnstructured
を使うと楽にできます
import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) obj := &unstructured.Unstructured{} out = &nginxv1.Hoge{} runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &hoge)
番外編: PodTemplateSpec から Hashを作成する関数
Deployment から作成された Pod って <deployment name>-<PodTemplateSpec Hash>-<よくわからないハッシュ値っぽいやつ>
という命名規則っぽいです(ドキュメントとかに書いてあるのかわからなかったので推測です
この Spec からハッシュ値を作成するコードを下に書いておきます。
func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) { hasher.Reset() printer := spew.ConfigState{ Indent: " ", SortKeys: true, DisableMethods: true, SpewKeys: true, } printer.Fprintf(hasher, "%#v", objectToWrite) } // DeepHashObjectToString creates a unique hash string from a go object. func DeepHashObjectToString(objectToWrite interface{}) string { hasher := md5.New() hash.DeepHashObject(hasher, objectToWrite) return hex.EncodeToString(hasher.Sum(nil)[0:]) }
この DeepHashObject 関数を用いて上位8byte を抽出して replicaset の名前にしたものが下記のコードになります
func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int32) string { podTemplateSpecHasher := fnv.New32a() hashutil.DeepHashObject(podTemplateSpecHasher, *template) // Add collisionCount in the hash if it exists. if collisionCount != nil { collisionCountBytes := make([]byte, 8) binary.LittleEndian.PutUint32(collisionCountBytes, uint32(*collisionCount)) podTemplateSpecHasher.Write(collisionCountBytes) } return rand.SafeEncodeString(fmt.Sprint(podTemplateSpecHasher.Sum32())) }
コード自体は k8s.io/kubernetes/pkg/util/hash
とかにあるんですが、go mod の依存関係のせいか、直接は import できないようです。
まとめ
いくつか Kubernetes の Custom Controller 開発で使えそうなコードを紹介させていただきました。 他の書籍や記事と被っているかもしれません。
参考
入社しました
4月に渋谷の某企業に就職しました。 別に会社名隠したところですぐバレそうですが、なんか社名のっけたくなかっただけです。
会社どうですか
人格者かつ技術力高い人が多いのでストレスフリーです。 あと会社の文化的なものによるのかビジネスの観点もしっかりしている人が多い印象です。
今なにしているか
社内向けのKubernetes as a Servieを作っています。 OpenStackを触ったりKubernetesを触ったりする生活です。コードも書いてます。
CNDT2020で同期と一緒に発表するのでよかったら聞いてください。 大きいタスクはほぼ同期がやってくれたので僕は足りないものをひたすらコード書いて実装していくという仕事だったので気持ちは楽でした
君サーバサイドエンジニアじゃないの?なんでインフラエンジニアやってんの?
インフラエンジニアを名乗るにはおこがましいぐらい周りに詳しい方がたくさんいるのでインフラ寄りのソフトウェアエンジニアという表現が正しいです。 逆にコードを書くという点では自分が一番やってやるぞ!という気概で今働いてます。楽しいです。
インフラ寄りのソフトウェアを作るのが楽しそうだなとずっと思ってたので今の部署を選んだという感じです。 なので名刺もインフラエンジニアじゃなくてソフトウェアエンジニアという肩書きで刷ってもらいました。
KubernetesやOpenStack関連のOSSに詳しい方が多いのでその助けもあって早速ingress-nginxというソフトにちょっとしたコントリビュートもできたりしました。
これから何をしていくのか
Kubernetes as a Serviceの開発も続けるし、今まで触れてこなかったハードウェアについてもしっかり学んでいきたいです。 サーバとかストレージ触ってみたい。
最後に
研究生活のストレスから開放されたおかげであらゆることに菩薩の心で臨むことができていてすこぶる体調が良いです。 在学中にご心配かけた方も何人かいるんですが、その節はお世話になりました。
プログラミングを教えて欲しい人たちへ
最近「プログラミングを教えて欲しい」と言われることが多くなりました。 一々LINEやFacebookで長文書くのも面倒なのでここに僕の回答を書いておきます。
手取り足取り教えるのは無理
これは申し訳ないけど自分のことで手一杯なので有償無償問わず, つきっきりは無理という話。 その上でよくある質問について答えていきます
GoogleやAmazon, Facebookみたいなインパクトのあるプログラミングができるようになりたい!
そんなの教えられるなら僕は真面目に仕事してないと思います。諦めましょう
Webサイトを作りたい
HTML, JavaScript, CSSでぐぐりましょう。
デザイン?BootstrapかMaterialDesignでいいんじゃないですかね?
最強の選択肢であるWordPressもありました。WordPressやりたかったらLAMPでググってください
ゲーム作りたい
Unityでググればいいんじゃないですかね?
リアルなやつだったらUE4とかなのかな?
なんかわからんけどプログラミングしてみたい
「プログラミング入門 windows」でググって適当に出てきた記事を実践してみればいいんじゃないですかね? 最近だとProgateとか良質なサイトもあるっぽいし。使ったことないから知らんけど
〇〇っぽいサービス作りたい
「〇〇 clone application」とかでググると英語だけど動画とかで既存のアプリケーションの内部アーキテクチャとかデザイン込みで説明してくれてる動画あるから、 それ参考にすればいいんじゃないですかね?英語わからんから知らんけど
なんの言語がオススメですか
全部。COBOLとかFORTRANみたいな化石はお勧めせんけど。
なんの勉強から始めればいいかわかりません
僕もわからん
とりあえず英語から始めれば良いと思います。これはマジで
まとめ
とりあえずググれ
Envoy ビルドメモ
妙に詰まったのでメモ
環境
GCE: Ubuntu 16.04 Goインストール済み
手順
まず, clang-9 + llvm を入れる(7.0以上が条件らしい)
依存パッケージインストール
{ sudo apt-get update sudo apt-get upgrade sudo apt-get install \ libtool \ cmake \ automake \ autoconf \ make \ ninja-build \ curl \ unzip \ virtualenv }
リポジトリクローンとビルドコンフィグの設定
git clone https://github.com/envoyproxy/envoy cd envoy echo "build --config=clang" >> ./tools/bazel.rc echo 'build --action_env=PATH="/usr/local/bin:/opt/local/bin:/usr/bin:/bin"' >> ./tools/bazel.rc go get -u github.com/bazelbuild/buildtools/buildifier && export BUILDIFIER_BIN=$GOPATH/bin/buildifier go get -u github.com/bazelbuild/buildtools/buildozer && export BUILDOZER_BIN=$GOPATH/bin/buildozer
ビルド
bazel build //source/exe:envoy-static
初回は超長い
Kubernetes The Hard Way におけるNodeを跨いだPod間のパケットフロー
何をするのか
Kubernetesの構築は各種クラウドベンダによるサービスによって非常に簡易化されている. 自前のサーバにKubernetesをデプロイする場合もkubeadmのようなツールによって必要なバイナリを全て手でインストールするというような手間を取る必要はなくなってきている.
しかし, 上記のリポジトリではGCEで用意したUbuntuインスタンスに全て手動でKubernetesのコンポーネントをインストールしていくというハンズオンを体験できる.
今回はそのハンズオンで構築したKubernetes上で実際にPodからPodへ向けてのパケットフローがどのように行われるかを検証した.
環境
検証の環境は以下の画像のようになる. 厳密には3台のマスタに3台のワーカがGCE上で動作しているが今回検証で使用したのは2つのノードだけである.
ストーリー
今回は以下のストーリーにおけるパケットの転送の様子を見てみる
pod 1からpod 2へpingする
pod 1からpod 2へpingする
この時, Node 1上でcnio0 ネットワークインタフェース(NI)をtcpdumpすると以下のような出力になる
03:45:45.813652 IP 10.200.0.2 > 10.200.1.2: ICMP echo request, id 2816, seq 0, length 64 03:45:45.815508 IP 10.200.1.2 > 10.200.0.2: ICMP echo reply, id 2816, seq 0, length 64 03:45:46.814361 IP 10.200.0.2 > 10.200.1.2: ICMP echo request, id 2816, seq 1, length 64 03:45:46.814807 IP 10.200.1.2 > 10.200.0.2: ICMP echo reply, id 2816, seq 1, length 64 03:45:47.814526 IP 10.200.0.2 > 10.200.1.2: ICMP echo request, id 2816, seq 2, length 64 03:45:47.814916 IP 10.200.1.2 > 10.200.0.2: ICMP echo reply, id 2816, seq 2, length 64
Podに割り当てられたvethからcnio0に対してパケットが転送されていることがわかる.
次にcnio0からNode本来のNIであるens4にパケットが転送されているかを確認したところ以下のような出力になっていた
03:49:47.934135 IP worker-0.us-west1-c.c.xxxx.internal > 10.200.1.2: ICMP echo request, id 5376, seq 0, length 64 03:49:47.935690 IP 10.200.1.2 > worker-0.us-west1-c.c.xxxx.internal: ICMP echo reply, id 5376, seq 0, length 64
ドメイン名をnslookupするとNodeのアドレスにNATされていることがわかる
$ nslookup worker-2.us-west1-c.c.xxxx.internal Server: 127.0.0.53 Address: 127.0.0.53#53 Non-authoritative answer: Name: worker-2.us-west1-c.c.xxxx.internal Address: 10.240.0.20
ここまでの状況をまとめると以下の画像のようになる
NodeのNIまでたどり着いたパケットは宛先が 10.200.0.0/24
に該当しないのでDefault Gatewayに転送する.
この時のGWルータはGCE上で作成したVPCルータとなる.
kubernetes-the-hard-wayによる手順ではVPCルータに以下のルールを追加している
NAME NETWORK DEST_RANGE NEXT_HOP PRIORITY default-route-081879136902de56 kubernetes-the-hard-way 10.240.0.0/24 kubernetes-the-hard-way 1000 default-route-55199a5aa126d7aa kubernetes-the-hard-way 0.0.0.0/0 default-internet-gateway 1000 kubernetes-route-10-200-0-0-24 kubernetes-the-hard-way 10.200.0.0/24 10.240.0.20 1000 kubernetes-route-10-200-1-0-24 kubernetes-the-hard-way 10.200.1.0/24 10.240.0.21 1000 kubernetes-route-10-200-2-0-24 kubernetes-the-hard-way 10.200.2.0/24 10.240.0.22 1000
これによって宛先10.200.1.2のパケットは 10.240.21に転送される.
ここまでの処理でパケットはNode 2のens4 NIに到達した
Node2でのrouteは以下のようになっているのでcnio0に転送される. cnio0にはvethが接続されており, あとはpodのvethに転送された後, ペアのeth0に送信され,往路のパケット転送が完了する.
route Kernel IP routing table Destination Gateway Genmask Flags Metric Ref Use Iface default _gateway 0.0.0.0 UG 100 0 0 ens4 10.200.1.0 0.0.0.0 255.255.255.0 U 0 0 0 cnio0 _gateway 0.0.0.0 255.255.255.255 UH 100 0 0 ens4
復路もVPCルータを介してNode 1に届くまでは一緒だが, Pod 2から送出されるパケットの宛先IPは Pod 1のIPアドレスではなく, Node 1のens4のIPアドレスとなっている. これは下記のiptablesのルールによってSNATされているためである.
Chain POSTROUTING (policy ACCEPT 7 packets, 696 bytes) pkts bytes target prot opt in out source destination 47471 4663K KUBE-POSTROUTING all -- * * 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules */ 97 7561 CNI-be6576a86b4e6dd707f727f8 all -- * * 10.200.2.2 0.0.0.0/0 /* name: "bridge" id: "6f7cc23d7330f226cf75461157038a8718c6bb9b03a53a4bdfd7c1388d49fcfa" */ 0 0 CNI-014fd6fdf5198c5b6219910e all -- * * 10.200.2.3 0.0.0.0/0 /* name: "bridge" id: "0c54384b9da08224e9733348a2ef7da105c285ebd50c62a0c30cebf0712e5e55" */ Chain CNI-be6576a86b4e6dd707f727f8 (1 references) pkts bytes target prot opt in out source destination 0 0 ACCEPT all -- * * 0.0.0.0/0 10.200.2.0/24 /* name: "bridge" id: "6f7cc23d7330f226cf75461157038a8718c6bb9b03a53a4bdfd7c1388d49fcfa" */ 97 7561 MASQUERADE all -- * * 0.0.0.0/0 !224.0.0.0/4 /* name: "bridge" id: "6f7cc23d7330f226cf75461157038a8718c6bb9b03a53a4bdfd7c1388d49fcfa" */
VPCルータからNode 1に届けられたリプライパケットはPod1からリクエストされた際にiptablesで適用されたPOSTROUTINGテーブルのIPマスカレードのルールが icmpパケット内のシーケンスIDからPod1のIPへ逆NATする.
POSTROUTINGの返信における適用: unix.stackexchange.com
これによって宛先IPはPod1の 10.200.0.2/32
となることでcnio0にフォワーディングされ,
cnio0がもつ内部テーブルから宛先のvethへパケットが転送され, Pod 1へ正しくルーティングされる.
まとめ
今回のハンズオンで実験した環境はdockerのネットワークと同等である.
=> マルチホストでのDocker Container間通信 第1回: Dockerネットワークの基礎 - UZABASE Tech Blog
そのためflannelのようなCNIにおけるvxlanによるパケットのラップやvtepによるMAC addressの解決は起こっていない. 基本的にはNodeのiptablesにおけるNATのみでNodeをまたいだpod間の通信は行われていることがわかった.
謝辞
今回疑問の解決に協力していただいたTwitterのフォロワーの方やKubernetes Slackの方々にこの場を借りてお礼申し上げます.
About etcd and NoSQL
はじめに
本記事は
adventar.org における19日目の記事になります.
Previous ICCV2019 Best paper SinGANのできる事できない事[実践編] - Qiita
Next @Shagamii による記事
となっていますのでよろしければこちらもどうぞ.
さて, 今回は昨今多く世に出てきているNoSQL(NewSQL)とそのうちの一つであるetcdについて述べます. etcdに関してよく知らないという方に向けて参考になれば幸いです.
NoSQL, NewSQLについて
NoSQLとは
NoSQLの"No"の意味には様々な解釈がありますが, 僕自身では "Not Only SQL" だと考えています. 所謂, MySQLやPostgreSQLのようなリレーショナルなデータ構造を持たないDBを指しています.
多くのWebソフトウェアでセッションのキャッシュなどで用いられているredisも代表的なNoSQLDBでしょう. redisやmemcachedなどは一般的にKVS(Key Value Store)と呼ばれており, シンプルなデータ構造とAPIが特徴です. KVSはプログラミング言語で言えばmapの構造と似ています.
map[string]string{ "key_1": "value1", "key_2": "value2", }
KVSはインメモリで実装されていることが多く, 読み書きが非常に高速に行えます. NoSQL = KVSとしている記事なども多いですが必ずしもそうではありません.
例えばMongoDBなどはKVSではなくドキュメントベースのDBになります. 仮に, ユーザーの次のようなプロフィールをリレーショナルDBで表現する場合,
Profile: - Name: "Katsuya Kawabe" - Summary: "Student of Shibaura Institute of Technology" - Education: - Bachelor student of SIT, 2014-2018 - Master student of SIT, 2018-2020 - Contact Info: - Twitter: @KKawabe108
テーブルを正規化したなら userテーブル, Universityテーブル, Contact Infoテーブルのように分けて, クエリを実行する際には各テーブルのIDを辿っていきアプリケーションに返すといった手法が一般的でしょう.
ドキュメントベースのDBのMongoDBではこれらの情報を一つのjsonを一行単位のデータとして保持します. こうすることでユーザーのデータをフェッチする際には一つのクエリで済みます.
また, 多くのドキュメントDBは他の行と関連性を持たせることが少ないことを想定しているのでRDBのようなリレーションを保持せず、スキーマレスなストレージとして活用できることがメリットであるとも言われています. 正確にはスキーマレスではなく, スキーマオンリード と呼ばれる読み取りの時に構造が解釈されるものです.
そのメリットとして, 例えば古い行のデータは名前が姓と名に分けていませんが何らかの事情で分けたくなったとします. アプリケーションのコードは次のようになるかもしれません
if (user != null && user.name && !user.first_name) { // 古いデータはfirst_nameが存在しない user.first_name = user.name.split(" ")[0]; }
コードはこれで良いですし, ドキュメントDBであれば追加するデータのスキーマを変更するだけでストレージ上の既存のデータには特段処理を加える必要はありません.
RDBであれば ALTER TABLE ...
のようにマイグレーション処理を行わなければならないでしょう.
昨今なメジャーなドキュメントDBでは
- MongoDB
- RethinkDB
- CouchDB
- Espresso
などがあります. 他にもGraphベースのDBなど調べれば沢山のNoSQL DBが存在しますがここでは割愛します.
NewSQL
NewSQLとは簡単に言えば NoSQL + SQL(With Transaction) といういいとこ取りなDBです NewSQL登場の背景としてはRDBがそのリレーショナルなスキーマに縛られたデータの保持方法からスケールアウトしづらい構造でした. ただ, 最近(5~10年前から)はビッグデータの流行によって, 高負荷なread and writeを処理できず, NoSQLが登場しました.
しかし, 人間は強欲なのでNoSQLにトランザクション機能まで欲しくなりました. そうした結果, 水平オートスケール, SQL機能を持ったNewSQLが登場したわけです.
有名なNewSQLの製品だと
等が挙げられます. そもそも, NoSQLは行間のリレーショナルが存在せず, トランザクションが行われず行をロックしないので高速だったわけですが, 昨今様々なWebソフトウェアでも使用されているSpannerはこのトランザクションにおけるロック機構, データ構造を工夫することによって高速かつ水平オートスケール可能なNewSQLを実現しています.
その技術の変態度合いについては各所の記事で紹介されているのでそちらをご覧ください
また, CockroachDB, TiDBに関してはGoogle F1, Spannerの論文から着想を得たオープンソースなNewSQLで いわゆる HTAP(Hybrid Transactional and Analytical Processing) をサポートするデータベースです. 有名なAWSにおけるAuroraの代替と位置付けるものでもあります.
これらはまだ大きなWebサービスで使用されているという話は聞いていませんが大きく期待できるプロダクトだと勝手に思っています.
etcd
さて, ようやくタイトルにある通り, etcdの話をします.
etcdは先述したNoSQLでありKVSなDBです.
また, 紹介では分散KVSとも呼ばれることが多いです. 複数のキーにまたがる強力なトランザクションはサポートしていなかった気がするのでNewSQLではありません.
KVSはredisを始め, 有名なプロダクトが他にもたくさんあるわけですが, なぜetcdは開発されたのでしょうか
誕生の背景
その理由はシンプルで, 単純にプロダクトレディな高信頼な分散KVSがなかったから と考察します. redisやmemcachedはそもそもクラスタレベルで運用されるのが珍しく, クラスタ間の整合性も高速化のために結果整合性 という最終的に結果がいつか反映されればOKというスタンスで実装されています.
例えば図のようにredisのマスターにx=1という書き込みを行い, B, Cのスレーブには問題なくレプリケーションを行えても, スレーブAに反映する前にマスターがダウンし, スレーブAがマスターに昇格してしまった場合, マスタ-スレーブ間でのデータ不整合に繋がってしまいます.
etcdではこういったクラスタによる運用を前提とし, なおかつノード間での強力なデータ一貫性である 強整合性 を担保することを目的としています. 強整合性は一般的にトレードオフとして, パフォーマンスが低くなったり, ネットワークの切断といったシステムフォールトに対する耐性が低くなったりといった問題がありますが, etcdはいくつかの工夫によってそのリスクを抑えています.
特徴
使用する際に確認できる主な特徴は以下の点です
- データの読み書きをHTTPベースのリクエストで行える
- KVSなので当然書き込み速度, 読み込む速度は高速
- Raft(後述します)を用いたノード間でのデータ一貫性を担保
ユースケース
etcd.io のホームページによると Rook(Kubernetes nativeなストレージエンジン), CoreDNS, OpenStack などに用いられています. それぞれのプロダクト内でノード間のコンフィグ(他のノードのIDを鍵としてIPを解決する)ようなコンフィギュレーションを保持するのに用いられているのが多い印象です. HadoopにおけるZookeeperのような役割を想像していただけるとわかりやすいのではないでしょうか.
Raft
etcdをはじめとした昨今有名な分散KVS(TiKVなど)はバックエンドのコアロジックとしてRaftを使用していることが多いです. 説明しようと思ったのですが時間がなかったので論文とわかりやすい資料を貼っておきますので許してください.
raft.github.io このサイトの raft paperという項目から論文に飛べます
上記のスライドにもありますが, ざっくばらんに言うと「リーダーとレプリケーション間で合意が取れた処理は全てのノードで結果が保証される」というものです. PaxosというGoogle内のChubbyやZookeeperなどにも使われている分散合意アルゴリズムが難しすぎたことによりRaftが生まれました.
Raftを実装することでKVSへの書き込み処理を全てのノードに伝搬させることができ, ネットワークの分断, ノードの唐突なダウンにも耐性がつきます.
実装
せっかくなのでRaftにキーを書き込むまでに何が起こっているのかをコードレベルで追ってみます. etcdはGoで実装されています. その理由としては
etcd is written in Go, which has excellent cross-platform support, small binaries and a great community behind it. Communication between etcd machines is handled via the Raft consensus algorithm.
- クロスプラットフォーム
- バイナリ小さい
- コミュニティが良い
といった理由だそうです.
それではetcdを起動して以下のクライアントツールを用いてキーを書き込みます
etcdをビルドする際にはgit cloneしてルート直下で ./build
で完了します.
ビルドしたバイナリを起動するには
$ ./bin/etcd
で良いです
クライアントツールも同時にビルドされるので以下のコマンドで今回は実験しました.
$ ./bin/etcdctl put mykey "hoge"
これにより, 内部でetcdのサーバを叩いてキーの登録を行ます. etcdのサーバはgRPCによるAPIを提供していてPutコマンドによるエントリーポイントが次のコードになります
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now()) resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) if err != nil { return nil, err } return resp.(*pb.PutResponse), nil }
etcd/v3_server.go at master · etcd-io/etcd · GitHub
内部で s.raftRequest
を呼び出してマスター, レプリケーション間でのデータの処理を行ます.
その後, いくつかの関数を経由して processInternalRaftRequestOnce
=> Propose
=> stepWait
=> stepWithWaitOption
で関数の呼び出しが止まります.
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { ai := s.getAppliedIndex() ci := s.getCommittedIndex() if ci > ai+maxGapBetweenApplyAndCommitIndex { return nil, ErrTooManyRequests } r.Header = &pb.RequestHeader{ ID: s.reqIDGen.Next(), } authInfo, err := s.AuthInfoFromCtx(ctx) if err != nil { return nil, err } if authInfo != nil { r.Header.Username = authInfo.Username r.Header.AuthRevision = authInfo.Revision } data, err := r.Marshal() if err != nil { return nil, err } if len(data) > int(s.Cfg.MaxRequestBytes) { return nil, ErrRequestTooLarge } id := r.ID if id == 0 { id = r.Header.ID } ch := s.w.Register(id) cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) defer cancel() start := time.Now() err = s.r.Propose(cctx, data) // 以下略 }
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { if m.Type != pb.MsgProp { select { case n.recvc <- m: return nil case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } } ch := n.propc pm := msgWithResult{m: m} if wait { pm.result = make(chan error, 1) } select { case ch <- pm: if !wait { return nil } case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } select { case err := <-pm.result: if err != nil { return err } case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } return nil }
stepWithWaitOption
では特に何もしていないように一見みえるのですが
ch := n.propc pm := msgWithResult{m: m} if wait { pm.result = make(chan error, 1) } select { case ch <- pm: if !wait { return nil }
このチャネルによる書き込みで別goroutineでリッスンしているハンドラーが起動します.
具体的にはここからraftのコードに移行していきます
func (n *node) run() { var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} var rd Ready r := n.rn.raft lead := None for { // 中略 select { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. case pm := <-propc: log.Println("Debug: execute propc") m := pm.m m.From = r.id err := r.Step(m) if pm.result != nil { pm.result <- err close(pm.result) }
先ほどmessageを書き込んだchannelをここで読み込み r.Step
関数に渡しているのがわかります.
Step関数も非常に長いので省略しますが, Raft内で使用される論理的な時間のtermのチェックをした後, 問題なければさらに内部のstep関数へ処理を渡します
func (r *raft) Step(m pb.Message) error { // Handle the message term, which may result in our stepping down to a follower. switch { // 中略 default: if m.Type == pb.MsgProp { log.Println("Debug: raft.Step", m.String()) } err := r.step(r, m) if err != nil { return err } } return nil }
step関数構造体内で関数ポインタとして定義されており, raftの状態(leader, candidate, follower)により関数の実態が変わります
type raft struct { // 省略 step stepFunc }
状態が変化するごとに関数を以下のように置き換えています
func (r *raft) becomeFollower(term uint64, lead uint64) { r.step = stepFollower r.reset(term) r.tick = r.tickElection r.lead = lead r.state = StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term) } func (r *raft) becomeCandidate() { // TODO(xiangli) remove the panic when the raft implementation is stable if r.state == StateLeader { panic("invalid transition [leader -> candidate]") } r.step = stepCandidate r.reset(r.Term + 1) r.tick = r.tickElection r.Vote = r.id r.state = StateCandidate r.logger.Infof("%x became candidate at term %d", r.id, r.Term) } func (r *raft) becomeLeader() { // TODO(xiangli) remove the panic when the raft implementation is stable if r.state == StateFollower { panic("invalid transition [follower -> leader]") } r.step = stepLeader r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader // 省略 r.reduceUncommittedSize([]pb.Entry{emptyEnt}) r.logger.Infof("%x became leader at term %d", r.id, r.Term) }
今回はetcdをシングルノード, leaderで動かしているのでstep関数はstepLeader関数となります. これでようやくそれっぽいメッセージの実処理を担っていそうなstepLeader関数にたどり着きました
func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { // 省略 case pb.MsgProp: log.Println("Debug: leader step function") if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) } if r.prs.Progress[r.id] == nil { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. return ErrProposalDropped } if r.leadTransferee != None { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } for i := range m.Entries { e := &m.Entries[i] var cc pb.ConfChangeI if e.Type == pb.EntryConfChange { var ccc pb.ConfChange if err := ccc.Unmarshal(e.Data); err != nil { panic(err) } cc = ccc } else if e.Type == pb.EntryConfChangeV2 { var ccc pb.ConfChangeV2 if err := ccc.Unmarshal(e.Data); err != nil { panic(err) } cc = ccc } if cc != nil { alreadyPending := r.pendingConfIndex > r.raftLog.applied alreadyJoint := len(r.prs.Config.Voters[1]) > 0 wantsLeaveJoint := len(cc.AsV2().Changes) == 0 var refused string if alreadyPending { refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied) } else if alreadyJoint && !wantsLeaveJoint { refused = "must transition out of joint config first" } else if !alreadyJoint && wantsLeaveJoint { refused = "not in joint state; refusing empty conf change" } if refused != "" { r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused) m.Entries[i] = pb.Entry{Type: pb.EntryNormal} } else { r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 } } } if !r.appendEntry(m.Entries...) { return ErrProposalDropped } r.bcastAppend() return nil
送信したメッセージの種別はデバッグしたところpb.MsgProp
だったので上記のcase文内に移動します.
注目してほしいのは関数後部の以下の処理です.
if !r.appendEntry(m.Entries...) { return ErrProposalDropped } r.bcastAppend() return nil
Raftは論文内でリクエストの合意をチェック後, 処理するAPIとしてAppendEntryというインタフェースを定義しています. ということでおそらくここでRaft内におけるログの追加を行なっていると考察してappendEntryをみてみます
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { li := r.raftLog.lastIndex() for i := range es { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) } // Track the size of this uncommitted proposal. if !r.increaseUncommittedSize(es) { r.logger.Debugf( "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id, ) // Drop the proposal. return false } // use latest "last" index after truncate/append li = r.raftLog.append(es...) r.prs.Progress[r.id].MaybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() return true }
ビンゴでした. Raftはデータの整合性を保つためにWALを行います.
さらに, 処理したログのインデックスを保存し, リーダーが変わった時などに照合することで自分が本当に正しいデータの状況かどうかをチェックすることができます.
このappendEntry
でログを追加し, 暫定的なcommit状況にした後にbcastAppend
でfollower全体に処理を通知, 合意されたならばmaybeCommitから永続的なcommitに変更されます.
これでPutにおけるキーの追加が完了されました.
まとめ
最後の実装周りが駆け足になってしまい申し訳ありません. 本記事では, NoSQLの紹介, ついでにNewSQLの紹介, そしてNoSQLの一種であるetcdについて述べました.
分散KVSは分散システムのもっともディープでベアメタル?な感じがして調べていて非常に面白いです. この記事で少しでも興味をもっていただけたら嬉しいです.