SWEet

A Software Engineer Is Eating Technologies

入社して2年が経ったけど転職・・・しません

CyberAgent に入社して2年が経ちました。今日から3年目です。 入社してからは CIU というインフラ横断の組織で開発を主にしていました (入社した時はまだ SIA って組織名でしたけどね)

入社してから何をしてたか

AKE というコンテナ基盤を改修、ユーザさんのサポートをしていました。 そういったことを率先していたからかプロダクトオーナーの権限を一部委譲していただけたりもしました。

それから AKE のリプレイスということで AKE v2 の技術調査、一部コンポーネントの設計なんかを色々やらせてもらいました。 詳しくはこの間の CA Developers Conference という社外向けの発表を見ていただけると。(僕もライブQ&Aに出てます)

www.youtube.com

あとは ML 基盤の開発なんかも始まって2軸体制になってコンテキストスイッチがなかなか大変でした。今も大変ですけど。 ML 基盤の話はこちらを見ていただけるのが早いです。

logmi.jp

という感じでプライベートクラウドだからこそできる開発を存分に楽しませていただきました。

これからは開発のピークは超えてきているので運用も本格的に始まってくるので SRE や SA 的立ち回りもしっかりできてくるといいですね。 開発してはいおしまいというのは昨今の潮流にも反しますので。

ちなみに外部登壇とかだとソフトウェアエンジニアって名乗ってます。理由としてはインフラだけ見てるわけではないし SRE 的立ち位置でもないし、でもコードは書いてるし運用もしてるなぁということで良い落とし所がソフトウェアエンジニアって肩書きだったからですね。CIU のこと知らない人に説明するとき苦労してます。

発表系

正直外部発表とか他の人に批判されそうなので乗り気ではなかったりするんですが、せっかく頑張ったことを発表させてくれる機会があるならということで色々やってました。 これからもちょくちょくできたらいいですね。

event.cloudnativedays.jp

event.cloudnativedays.jp

www.youtube.com

OSS

Cluster API を AKE で使ってるのですこーしだけコントリビュートさせてもらったりしました。 Cluster API は難しいプロダクトなんですが、これからも継続的に改善のお手伝いをできたらいいなって感じです。 ぶっちゃけ Cluster API Provider OpenStack Provider は AKE で独自実装して Origin でも使えそうなコードが結構あるんですが腰が上がらずにいます。やる気がほしい

github.com

github.com

これから

興味あることをやりつつしっかり会社の事業とかに貢献できたらいいなって思います。 この会社いろんな人やプロダクトがあるので見てて飽きません。

ということで CIU では人を募集していますので上記のプロダクトに関わりたいという方は是非応募お願い致します。

hrmos.co

hrmos.co

Custom Controller の開発で使えそうなコード集 #Kubernetes2 Advent Calendar 2020 16日目

この記事は Kubernetes2 Advent Calendar 2020qiita.com の16日目の記事です。

Custom Controller で よく使う、使えそう なコードの断片をラフに紹介してみようと思います。

想定する環境

  • Kubebuilder v2
  • go 1.14

今回実装した CRD の example

apiVersion: nginx.k8s-2020.advent.calendar/v1
kind: Nginx
metadata:
  name: nginx-sample
spec:
  name: nginx-1
  replices: 1

Custom Controller のロジック

  • nginxリソースが作成されると nginx の Deployment と Service を作成
  • nginxリソースが削除されると nginx の Deployment と Service を削除

サンプルなので非常に簡単なものになっています。

以降のコードではこちらの CR をアプライ、Controller が処理する前提で話を進めます。

Delete Reconcile の開始

if !nginx.DeletionTimestamp.IsZero() {
    return r.reconcileDelete(ctx, logger, nginx)
}

Reconcile する Object の Timestamp を判定して削除処理に分岐させるコード。 reconcileDelete 関数は各自で実装しましょう。

NotFound を判別するコード

import (
        apierrors "k8s.io/apimachinery/pkg/api/errors"
)
    err := r.Get(ctx, req.NamespacedName, nginx)
    if err != nil {
        if apierrors.IsNotFound(err) {
            return reconcile.Result{}, nil
        }
        return reconcile.Result{}, err
    }

ok みたいな別の値で存在可否を返さないこともあるのでこういう書き方でチェックできます。

Patch Helper による Patch

    patchHelper, err := patch.NewHelper(nginx, r)
    if err != nil {
        return ctrl.Result{}, err
    }

    defer func() {
        if err := patchHelper.Patch(ctx, nginx); err != nil {
            if reterr == nil {
                reterr = errors.Wrapf(err, "error patching Nginx %s/%s", nginx.Namespace, nginx.Name)
            }
        }
    }()

sigs.k8s.io/cluster-api/util/patch パッケージを使用して、Reconcile 内で副作用的にリソースを書き換えた場合、必ず Patch 処理で更新を行うコード。 Before な Object のパラメータを Controller の内部で直接書き換えるのはどうなんだろうと思いますが、後述する Finalizer の実装などと相性が良いです。

Event Recorder で 処理内容を Event Resource として保存

Event Resource を用いることで、Controller の処理結果を kubectl get events で確認できるようにもなります。 まず、Reconciler に record.EventRecorder フィールドを追加しましょう。

import (
    ...
    "k8s.io/client-go/tools/record"
)


// NginxReconciler reconciles a Nginx object
type NginxReconciler struct {
    client.Client
    Log      logr.Logger
    Scheme   *runtime.Scheme
    Recorder record.EventRecorder
}

Reconciler の初期化で Recorder の方も初期化します。

   if err = (&controllers.NginxReconciler{
        Client:   mgr.GetClient(),
        Log:      ctrl.Log.WithName("controllers").WithName("Nginx"),
        Scheme:   mgr.GetScheme(),
        Recorder: mgr.GetEventRecorderFor("nginx-controller"),
    }).SetupWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to create controller", "controller", "Nginx")
        os.Exit(1)
    }

Event を記録する場合は次のようにします

r.Recorder.Eventf(nginx, corev1.EventTypeNormal, "Reconcile", "Reconcile Nginx resource %q", nginx.Name)

EventType は他にもあるので用途に応じて使い分けましょう。

Controller のログにも同時に出力されるようになります

2020-12-15T20:59:28.936+0900 DEBUG   controller-runtime.manager.events   Normal  {"object": {"kind":"Nginx","namespace":"default","name":"nginx-sample","uid":"7787eec9-ef85-4d56-ad75-dd4807b077d0","apiVersion":"nginx.k8s-2020.advent.calendar/v1","resourceVersion":"21597"}, "reason": "Reconcile", "message": "Reconcile Nginx resource \"nginx-sample\""}

kubectl get events でも確認できます

2m8s        Normal    Reconcile                 nginx/nginx-sample                                           Reconcile Nginx resource "nginx-sample"

Finalizer の付与と削除

Finalizer はざっくり説明すると、その Object が削除される前に行う処理を定義するものです。 CR に従属する別のリソースを CR が削除される前に削除するというのを保証するために使われます。

Custom Controller ではこの Finalizer キーを Object に付与、削除する処理を書くことでこれらを実現することができます。

Finalizer の付与

import (
    "context"


    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    ...
)
    ...
    controllerutil.AddFinalizer(nginx, nginxv1.NginxFinalizer)

kubebuidler books には Object の metadata フィールドにキーを append するように書いていますが、こちらの方が楽に書けます。 先ほどの patchHelper を併用することで、キーを付与後に行う更新忘れもなくなります。

Finalizer の削除

CR の削除の前処理が終わったら最後は Finalizer を削除しましょう。 こちらの RemoveFinalizerdefer で呼ばないように注意しましょう。他の削除ロジックでエラーを返した場合も実行されて、意図せず CR が削除されてしまいます。 そうなった場合、従属する Object を kubectl で手で削除していかなくてはいけません。

func (r *NginxReconciler) reconcileDelete(ctx context.Context, logger logr.Logger, nginx *nginxv1.Nginx) (ctrl.Result, error) {
     // Delete some objects

    controllerutil.RemoveFinalizer(nginx, nginxv1.NginxFinalizer)
    return ctrl.Result{}, nil
}

OwnerRefrence 周りのTips

書こう思ったらすでに某書籍でしっかり書かれていたっぽいのでリンクだけ貼っておきます

github.com

ObjectReference で外部リソースへの依存を表現

import (corev1 "k8s.io/api/core/v1")

type NginxSpec struct {
     HogeRef *corev1.ObjectReference
}

これによって CR のフィールドに他リソースへの参照を持たせることができます。

apiVersion: nginx.k8s-2020.advent.calendar/v1
kind: Nginx
metadata:
  name: nginx-sample
spec:
  name: nginx-1
  replices: 1
  hogeRef:
    apiVersion: hoge.io
    kind: Hoge
    name: hogefuga

名前だけでも良い場合もあると思いますが、kindapiVersion によってコードによる参照がしやすくなります。 具体的には以下のような感じ

import (
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "sigs.k8s.io/controller-runtime/pkg/client"
)

if nginx.Spec.HogeRef != nil {
    obj, err := GetExternalResource(ctx, r.Client, nginx.Spec.HogeRef, nginx.Namespace)
    
    // do something
}

func GetExternalResource(ctx context.Context, c client.Client, ref *corev1.ObjectReference, namespace string) (*unstructured.Unstructured, error) {
    obj := new(unstructured.Unstructured)
    obj.SetAPIVersion(ref.APIVersion)
    obj.SetKind(ref.Kind)
    obj.SetName(ref.Name)
    key := client.ObjectKey{Name: obj.GetName(), Namespace: namespace}
    if err := c.Get(ctx, key, obj); err != nil {
        return nil, errors.Wrapf(err, "failed to retrieve %s external object %q/%q", obj.GetKind(), key.Namespace, key.Name)
    }
    return obj, nil
}

注意しなくてはいけないのは、obj*Unstructured で返してるので、Goの構造体としてフィールドにメンバ変数を介してアクセスしたい場合はそれぞれの Object にマッピングする必要があります。 その場合は runtime パッケージの FromUnstructured を使うと楽にできます

import (
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

obj := &unstructured.Unstructured{}
out = &nginxv1.Hoge{}
runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &hoge)

番外編: PodTemplateSpec から Hashを作成する関数

Deployment から作成された Pod って <deployment name>-<PodTemplateSpec Hash>-<よくわからないハッシュ値っぽいやつ> という命名規則っぽいです(ドキュメントとかに書いてあるのかわからなかったので推測です この Spec からハッシュ値を作成するコードを下に書いておきます。

func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) {
    hasher.Reset()
    printer := spew.ConfigState{
        Indent:         " ",
        SortKeys:       true,
        DisableMethods: true,
        SpewKeys:       true,
    }
    printer.Fprintf(hasher, "%#v", objectToWrite)
}

// DeepHashObjectToString creates a unique hash string from a go object.
func DeepHashObjectToString(objectToWrite interface{}) string {
    hasher := md5.New()
    hash.DeepHashObject(hasher, objectToWrite)
    return hex.EncodeToString(hasher.Sum(nil)[0:])
}

この DeepHashObject 関数を用いて上位8byte を抽出して replicaset の名前にしたものが下記のコードになります

func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int32) string {
         podTemplateSpecHasher := fnv.New32a()
    hashutil.DeepHashObject(podTemplateSpecHasher, *template)

    // Add collisionCount in the hash if it exists.
    if collisionCount != nil {
        collisionCountBytes := make([]byte, 8)
        binary.LittleEndian.PutUint32(collisionCountBytes, uint32(*collisionCount))
        podTemplateSpecHasher.Write(collisionCountBytes)
    }

    return rand.SafeEncodeString(fmt.Sprint(podTemplateSpecHasher.Sum32()))
}

コード自体は k8s.io/kubernetes/pkg/util/hash とかにあるんですが、go mod の依存関係のせいか、直接は import できないようです。

まとめ

いくつか Kubernetes の Custom Controller 開発で使えそうなコードを紹介させていただきました。 他の書籍や記事と被っているかもしれません。

参考

GitHub - jetstack/kubebuilder-sample-controller: k8s.io/sample-controller written with kubebuilder v2

入社しました

4月に渋谷の某企業に就職しました。 別に会社名隠したところですぐバレそうですが、なんか社名のっけたくなかっただけです。

会社どうですか

人格者かつ技術力高い人が多いのでストレスフリーです。 あと会社の文化的なものによるのかビジネスの観点もしっかりしている人が多い印象です。

今なにしているか

社内向けのKubernetes as a Servieを作っています。 OpenStackを触ったりKubernetesを触ったりする生活です。コードも書いてます。

CNDT2020で同期と一緒に発表するのでよかったら聞いてください。 大きいタスクはほぼ同期がやってくれたので僕は足りないものをひたすらコード書いて実装していくという仕事だったので気持ちは楽でした

君サーバサイドエンジニアじゃないの?なんでインフラエンジニアやってんの?

インフラエンジニアを名乗るにはおこがましいぐらい周りに詳しい方がたくさんいるのでインフラ寄りのソフトウェアエンジニアという表現が正しいです。 逆にコードを書くという点では自分が一番やってやるぞ!という気概で今働いてます。楽しいです。

インフラ寄りのソフトウェアを作るのが楽しそうだなとずっと思ってたので今の部署を選んだという感じです。 なので名刺もインフラエンジニアじゃなくてソフトウェアエンジニアという肩書きで刷ってもらいました。

KubernetesやOpenStack関連のOSSに詳しい方が多いのでその助けもあって早速ingress-nginxというソフトにちょっとしたコントリビュートもできたりしました。

f:id:kk_river108:20200702200408p:plain

これから何をしていくのか

Kubernetes as a Serviceの開発も続けるし、今まで触れてこなかったハードウェアについてもしっかり学んでいきたいです。 サーバとかストレージ触ってみたい。

最後に

研究生活のストレスから開放されたおかげであらゆることに菩薩の心で臨むことができていてすこぶる体調が良いです。 在学中にご心配かけた方も何人かいるんですが、その節はお世話になりました。

プログラミングを教えて欲しい人たちへ

最近「プログラミングを教えて欲しい」と言われることが多くなりました。 一々LINEやFacebookで長文書くのも面倒なのでここに僕の回答を書いておきます。

手取り足取り教えるのは無理

これは申し訳ないけど自分のことで手一杯なので有償無償問わず, つきっきりは無理という話。 その上でよくある質問について答えていきます

GoogleAmazon, Facebookみたいなインパクトのあるプログラミングができるようになりたい!

そんなの教えられるなら僕は真面目に仕事してないと思います。諦めましょう

Webサイトを作りたい

HTML, JavaScript, CSSでぐぐりましょう。

デザイン?BootstrapかMaterialDesignでいいんじゃないですかね?

最強の選択肢であるWordPressもありました。WordPressやりたかったらLAMPでググってください

ゲーム作りたい

Unityでググればいいんじゃないですかね?

リアルなやつだったらUE4とかなのかな?

なんかわからんけどプログラミングしてみたい

「プログラミング入門 windows」でググって適当に出てきた記事を実践してみればいいんじゃないですかね? 最近だとProgateとか良質なサイトもあるっぽいし。使ったことないから知らんけど

〇〇っぽいサービス作りたい

「〇〇 clone application」とかでググると英語だけど動画とかで既存のアプリケーションの内部アーキテクチャとかデザイン込みで説明してくれてる動画あるから、 それ参考にすればいいんじゃないですかね?英語わからんから知らんけど

なんの言語がオススメですか

全部。COBOLとかFORTRANみたいな化石はお勧めせんけど。

なんの勉強から始めればいいかわかりません

僕もわからん

とりあえず英語から始めれば良いと思います。これはマジで

まとめ

とりあえずググれ

Envoy ビルドメモ

妙に詰まったのでメモ

環境

GCE: Ubuntu 16.04 Goインストール済み

手順

justiceboi.github.io

まず, clang-9 + llvm を入れる(7.0以上が条件らしい)

依存パッケージインストール

{
sudo apt-get update
sudo apt-get upgrade
sudo apt-get install \
   libtool \
   cmake \
   automake \
   autoconf \
   make \
   ninja-build \
   curl \
   unzip \
   virtualenv
}

リポジトリクローンとビルドコンフィグの設定

git clone https://github.com/envoyproxy/envoy
cd envoy
echo "build --config=clang" >> ./tools/bazel.rc
echo 'build --action_env=PATH="/usr/local/bin:/opt/local/bin:/usr/bin:/bin"' >> ./tools/bazel.rc

go get -u github.com/bazelbuild/buildtools/buildifier && export BUILDIFIER_BIN=$GOPATH/bin/buildifier
go get -u github.com/bazelbuild/buildtools/buildozer && export BUILDOZER_BIN=$GOPATH/bin/buildozer

ビルド

bazel build //source/exe:envoy-static

初回は超長い

Kubernetes The Hard Way におけるNodeを跨いだPod間のパケットフロー

何をするのか

github.com

Kubernetesの構築は各種クラウドベンダによるサービスによって非常に簡易化されている. 自前のサーバにKubernetesをデプロイする場合もkubeadmのようなツールによって必要なバイナリを全て手でインストールするというような手間を取る必要はなくなってきている.

しかし, 上記のリポジトリではGCEで用意したUbuntuインスタンスに全て手動でKubernetesコンポーネントをインストールしていくというハンズオンを体験できる.

今回はそのハンズオンで構築したKubernetes上で実際にPodからPodへ向けてのパケットフローがどのように行われるかを検証した.

環境

検証の環境は以下の画像のようになる. 厳密には3台のマスタに3台のワーカがGCE上で動作しているが今回検証で使用したのは2つのノードだけである. f:id:kk_river108:20200220122814p:plain

ストーリー

今回は以下のストーリーにおけるパケットの転送の様子を見てみる

pod 1からpod 2へpingする

pod 1からpod 2へpingする

この時, Node 1上でcnio0 ネットワークインタフェース(NI)をtcpdumpすると以下のような出力になる

03:45:45.813652 IP 10.200.0.2 > 10.200.1.2: ICMP echo request, id 2816, seq 0, length 64
03:45:45.815508 IP 10.200.1.2 > 10.200.0.2: ICMP echo reply, id 2816, seq 0, length 64
03:45:46.814361 IP 10.200.0.2 > 10.200.1.2: ICMP echo request, id 2816, seq 1, length 64
03:45:46.814807 IP 10.200.1.2 > 10.200.0.2: ICMP echo reply, id 2816, seq 1, length 64
03:45:47.814526 IP 10.200.0.2 > 10.200.1.2: ICMP echo request, id 2816, seq 2, length 64
03:45:47.814916 IP 10.200.1.2 > 10.200.0.2: ICMP echo reply, id 2816, seq 2, length 64

Podに割り当てられたvethからcnio0に対してパケットが転送されていることがわかる.

次にcnio0からNode本来のNIであるens4にパケットが転送されているかを確認したところ以下のような出力になっていた

03:49:47.934135 IP worker-0.us-west1-c.c.xxxx.internal > 10.200.1.2: ICMP echo request, id 5376, seq 0, length 64
03:49:47.935690 IP 10.200.1.2 > worker-0.us-west1-c.c.xxxx.internal: ICMP echo reply, id 5376, seq 0, length 64

ドメイン名をnslookupするとNodeのアドレスにNATされていることがわかる

$ nslookup worker-2.us-west1-c.c.xxxx.internal
Server:     127.0.0.53
Address:    127.0.0.53#53

Non-authoritative answer:
Name:   worker-2.us-west1-c.c.xxxx.internal
Address: 10.240.0.20

ここまでの状況をまとめると以下の画像のようになる f:id:kk_river108:20200220132428p:plain

NodeのNIまでたどり着いたパケットは宛先が 10.200.0.0/24 に該当しないのでDefault Gatewayに転送する. この時のGWルータはGCE上で作成したVPCルータとなる.

kubernetes-the-hard-wayによる手順ではVPCルータに以下のルールを追加している

NAME                            NETWORK                  DEST_RANGE     NEXT_HOP                  PRIORITY
default-route-081879136902de56  kubernetes-the-hard-way  10.240.0.0/24  kubernetes-the-hard-way   1000
default-route-55199a5aa126d7aa  kubernetes-the-hard-way  0.0.0.0/0      default-internet-gateway  1000
kubernetes-route-10-200-0-0-24  kubernetes-the-hard-way  10.200.0.0/24  10.240.0.20               1000
kubernetes-route-10-200-1-0-24  kubernetes-the-hard-way  10.200.1.0/24  10.240.0.21               1000
kubernetes-route-10-200-2-0-24  kubernetes-the-hard-way  10.200.2.0/24  10.240.0.22               1000

これによって宛先10.200.1.2のパケットは 10.240.21に転送される.

ここまでの処理でパケットはNode 2のens4 NIに到達した f:id:kk_river108:20200220132349p:plain

Node2でのrouteは以下のようになっているのでcnio0に転送される. cnio0にはvethが接続されており, あとはpodのvethに転送された後, ペアのeth0に送信され,往路のパケット転送が完了する.

route
Kernel IP routing table
Destination     Gateway         Genmask         Flags Metric Ref    Use Iface
default         _gateway        0.0.0.0         UG    100    0        0 ens4
10.200.1.0      0.0.0.0         255.255.255.0   U     0      0        0 cnio0
_gateway        0.0.0.0         255.255.255.255 UH    100    0        0 ens4

復路もVPCルータを介してNode 1に届くまでは一緒だが, Pod 2から送出されるパケットの宛先IPは Pod 1のIPアドレスではなく, Node 1のens4のIPアドレスとなっている. これは下記のiptablesのルールによってSNATされているためである.

Chain POSTROUTING (policy ACCEPT 7 packets, 696 bytes)
 pkts bytes target     prot opt in     out     source               destination
47471 4663K KUBE-POSTROUTING  all  --  *      *       0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */
   97  7561 CNI-be6576a86b4e6dd707f727f8  all  --  *      *       10.200.2.2           0.0.0.0/0            /* name: "bridge" id: "6f7cc23d7330f226cf75461157038a8718c6bb9b03a53a4bdfd7c1388d49fcfa" */
    0     0 CNI-014fd6fdf5198c5b6219910e  all  --  *      *       10.200.2.3           0.0.0.0/0            /* name: "bridge" id: "0c54384b9da08224e9733348a2ef7da105c285ebd50c62a0c30cebf0712e5e55" */

Chain CNI-be6576a86b4e6dd707f727f8 (1 references)
 pkts bytes target     prot opt in     out     source               destination
    0     0 ACCEPT     all  --  *      *       0.0.0.0/0            10.200.2.0/24        /* name: "bridge" id: "6f7cc23d7330f226cf75461157038a8718c6bb9b03a53a4bdfd7c1388d49fcfa" */
   97  7561 MASQUERADE  all  --  *      *       0.0.0.0/0           !224.0.0.0/4          /* name: "bridge" id: "6f7cc23d7330f226cf75461157038a8718c6bb9b03a53a4bdfd7c1388d49fcfa" */

VPCルータからNode 1に届けられたリプライパケットはPod1からリクエストされた際にiptablesで適用されたPOSTROUTINGテーブルのIPマスカレードのルールが icmpパケット内のシーケンスIDからPod1のIPへ逆NATする.

POSTROUTINGの返信における適用: unix.stackexchange.com

これによって宛先IPはPod1の 10.200.0.2/32 となることでcnio0にフォワーディングされ, cnio0がもつ内部テーブルから宛先のvethへパケットが転送され, Pod 1へ正しくルーティングされる.

まとめ

今回のハンズオンで実験した環境はdockerのネットワークと同等である.

=> マルチホストでのDocker Container間通信 第1回: Dockerネットワークの基礎 - UZABASE Tech Blog

そのためflannelのようなCNIにおけるvxlanによるパケットのラップやvtepによるMAC addressの解決は起こっていない. 基本的にはNodeのiptablesにおけるNATのみでNodeをまたいだpod間の通信は行われていることがわかった.

謝辞

今回疑問の解決に協力していただいたTwitterのフォロワーの方やKubernetes Slackの方々にこの場を借りてお礼申し上げます.

About etcd and NoSQL

はじめに

本記事は

adventar.org における19日目の記事になります.

Previous ICCV2019 Best paper SinGANのできる事できない事[実践編] - Qiita

Next @Shagamii による記事

となっていますのでよろしければこちらもどうぞ.

さて, 今回は昨今多く世に出てきているNoSQL(NewSQL)とそのうちの一つであるetcdについて述べます. etcdに関してよく知らないという方に向けて参考になれば幸いです.

NoSQL, NewSQLについて

NoSQLとは

aws.amazon.com

NoSQLの"No"の意味には様々な解釈がありますが, 僕自身では "Not Only SQL" だと考えています. 所謂, MySQLPostgreSQLのようなリレーショナルなデータ構造を持たないDBを指しています.

多くのWebソフトウェアでセッションのキャッシュなどで用いられているredisも代表的なNoSQLDBでしょう. redisやmemcachedなどは一般的にKVS(Key Value Store)と呼ばれており, シンプルなデータ構造とAPIが特徴です. KVSはプログラミング言語で言えばmapの構造と似ています.

map[string]string{
  "key_1": "value1",
  "key_2": "value2",
}

KVSはインメモリで実装されていることが多く, 読み書きが非常に高速に行えます. NoSQL = KVSとしている記事なども多いですが必ずしもそうではありません.

例えばMongoDBなどはKVSではなくドキュメントベースのDBになります. 仮に, ユーザーの次のようなプロフィールをリレーショナルDBで表現する場合,

Profile:
  - Name: "Katsuya Kawabe"
  - Summary:
    "Student of Shibaura Institute of Technology"
  - Education:
    - Bachelor student of SIT, 2014-2018
    - Master student of SIT, 2018-2020
  - Contact Info:
     - Twitter: @KKawabe108

テーブルを正規化したなら userテーブル, Universityテーブル, Contact Infoテーブルのように分けて, クエリを実行する際には各テーブルのIDを辿っていきアプリケーションに返すといった手法が一般的でしょう.

ドキュメントベースのDBのMongoDBではこれらの情報を一つのjsonを一行単位のデータとして保持します. こうすることでユーザーのデータをフェッチする際には一つのクエリで済みます.

また, 多くのドキュメントDBは他の行と関連性を持たせることが少ないことを想定しているのでRDBのようなリレーションを保持せず、スキーマレスなストレージとして活用できることがメリットであるとも言われています. 正確にはスキーマレスではなく, スキーマオンリード と呼ばれる読み取りの時に構造が解釈されるものです.

そのメリットとして, 例えば古い行のデータは名前が姓と名に分けていませんが何らかの事情で分けたくなったとします. アプリケーションのコードは次のようになるかもしれません

if (user != null && user.name && !user.first_name) {
  // 古いデータはfirst_nameが存在しない
  user.first_name = user.name.split(" ")[0];
}

コードはこれで良いですし, ドキュメントDBであれば追加するデータのスキーマを変更するだけでストレージ上の既存のデータには特段処理を加える必要はありません. RDBであれば ALTER TABLE ... のようにマイグレーション処理を行わなければならないでしょう.

昨今なメジャーなドキュメントDBでは

  • MongoDB
  • RethinkDB
  • CouchDB
  • Espresso

などがあります. 他にもGraphベースのDBなど調べれば沢山のNoSQL DBが存在しますがここでは割愛します.

NewSQL

NewSQLとは簡単に言えば NoSQL + SQL(With Transaction) といういいとこ取りなDBです NewSQL登場の背景としてはRDBがそのリレーショナルなスキーマに縛られたデータの保持方法からスケールアウトしづらい構造でした. ただ, 最近(5~10年前から)はビッグデータの流行によって, 高負荷なread and writeを処理できず, NoSQLが登場しました.

しかし, 人間は強欲なのでNoSQLにトランザクション機能まで欲しくなりました. そうした結果, 水平オートスケール, SQL機能を持ったNewSQLが登場したわけです.

有名なNewSQLの製品だと

  • CockroachDB
  • VoltDB
  • TiDB
  • Spanner(GCP)
  • F1(Google)

等が挙げられます. そもそも, NoSQLは行間のリレーショナルが存在せず, トランザクションが行われず行をロックしないので高速だったわけですが, 昨今様々なWebソフトウェアでも使用されているSpannerはこのトランザクションにおけるロック機構, データ構造を工夫することによって高速かつ水平オートスケール可能なNewSQLを実現しています.

その技術の変態度合いについては各所の記事で紹介されているのでそちらをご覧ください

また, CockroachDB, TiDBに関してはGoogle F1, Spannerの論文から着想を得たオープンソースなNewSQLで いわゆる HTAP(Hybrid Transactional and Analytical Processing) をサポートするデータベースです. 有名なAWSにおけるAuroraの代替と位置付けるものでもあります.

これらはまだ大きなWebサービスで使用されているという話は聞いていませんが大きく期待できるプロダクトだと勝手に思っています.

etcd

さて, ようやくタイトルにある通り, etcdの話をします.

etcdは先述したNoSQLでありKVSなDBです.

etcd.io

また, 紹介では分散KVSとも呼ばれることが多いです. 複数のキーにまたがる強力なトランザクションはサポートしていなかった気がするのでNewSQLではありません.

KVSはredisを始め, 有名なプロダクトが他にもたくさんあるわけですが, なぜetcdは開発されたのでしょうか

誕生の背景

その理由はシンプルで, 単純にプロダクトレディな高信頼な分散KVSがなかったから と考察します. redisやmemcachedはそもそもクラスタレベルで運用されるのが珍しく, クラスタ間の整合性も高速化のために結果整合性 という最終的に結果がいつか反映されればOKというスタンスで実装されています.

例えば図のようにredisのマスターにx=1という書き込みを行い, B, Cのスレーブには問題なくレプリケーションを行えても, スレーブAに反映する前にマスターがダウンし, スレーブAがマスターに昇格してしまった場合, マスタ-スレーブ間でのデータ不整合に繋がってしまいます. f:id:kk_river108:20191218220617p:plain

etcdではこういったクラスタによる運用を前提とし, なおかつノード間での強力なデータ一貫性である 強整合性 を担保することを目的としています. 強整合性は一般的にトレードオフとして, パフォーマンスが低くなったり, ネットワークの切断といったシステムフォールトに対する耐性が低くなったりといった問題がありますが, etcdはいくつかの工夫によってそのリスクを抑えています.

特徴

使用する際に確認できる主な特徴は以下の点です

  • データの読み書きをHTTPベースのリクエストで行える
  • KVSなので当然書き込み速度, 読み込む速度は高速
  • Raft(後述します)を用いたノード間でのデータ一貫性を担保

ユースケース

etcd.io のホームページによると Rook(Kubernetes nativeなストレージエンジン), CoreDNS, OpenStack などに用いられています. それぞれのプロダクト内でノード間のコンフィグ(他のノードのIDを鍵としてIPを解決する)ようなコンフィギュレーションを保持するのに用いられているのが多い印象です. HadoopにおけるZookeeperのような役割を想像していただけるとわかりやすいのではないでしょうか.

Raft

etcdをはじめとした昨今有名な分散KVS(TiKVなど)はバックエンドのコアロジックとしてRaftを使用していることが多いです. 説明しようと思ったのですが時間がなかったので論文とわかりやすい資料を貼っておきますので許してください.

raft.github.io このサイトの raft paperという項目から論文に飛べます

www.slideshare.net

上記のスライドにもありますが, ざっくばらんに言うと「リーダーとレプリケーション間で合意が取れた処理は全てのノードで結果が保証される」というものです. PaxosというGoogle内のChubbyやZookeeperなどにも使われている分散合意アルゴリズムが難しすぎたことによりRaftが生まれました.

Raftを実装することでKVSへの書き込み処理を全てのノードに伝搬させることができ, ネットワークの分断, ノードの唐突なダウンにも耐性がつきます.

実装

せっかくなのでRaftにキーを書き込むまでに何が起こっているのかをコードレベルで追ってみます. etcdはGoで実装されています. その理由としては

etcd is written in Go, which has excellent cross-platform support, small binaries and a great community behind it. Communication between etcd machines is handled via the Raft consensus algorithm.

といった理由だそうです.

それではetcdを起動して以下のクライアントツールを用いてキーを書き込みます etcdをビルドする際にはgit cloneしてルート直下で ./build で完了します.

ビルドしたバイナリを起動するには

$ ./bin/etcd 

で良いです

クライアントツールも同時にビルドされるので以下のコマンドで今回は実験しました.

 $ ./bin/etcdctl put mykey "hoge"

これにより, 内部でetcdのサーバを叩いてキーの登録を行ます. etcdのサーバはgRPCによるAPIを提供していてPutコマンドによるエントリーポイントが次のコードになります

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.PutResponse), nil
}

etcd/v3_server.go at master · etcd-io/etcd · GitHub

内部で s.raftRequest を呼び出してマスター, レプリケーション間でのデータの処理を行ます. その後, いくつかの関数を経由して processInternalRaftRequestOnce => Propose=> stepWait => stepWithWaitOption で関数の呼び出しが止まります.

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
    ai := s.getAppliedIndex()
    ci := s.getCommittedIndex()
    if ci > ai+maxGapBetweenApplyAndCommitIndex {
        return nil, ErrTooManyRequests
    }

    r.Header = &pb.RequestHeader{
        ID: s.reqIDGen.Next(),
    }

    authInfo, err := s.AuthInfoFromCtx(ctx)
    if err != nil {
        return nil, err
    }
    if authInfo != nil {
        r.Header.Username = authInfo.Username
        r.Header.AuthRevision = authInfo.Revision
    }

    data, err := r.Marshal()
    if err != nil {
        return nil, err
    }

    if len(data) > int(s.Cfg.MaxRequestBytes) {
        return nil, ErrRequestTooLarge
    }

    id := r.ID
    if id == 0 {
        id = r.Header.ID
    }
    ch := s.w.Register(id)

    cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
    defer cancel()

    start := time.Now()
    err = s.r.Propose(cctx, data)
         
        // 以下略
}
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
    if m.Type != pb.MsgProp {
        select {
        case n.recvc <- m:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        case <-n.done:
            return ErrStopped
        }
    }
    ch := n.propc
    pm := msgWithResult{m: m}
    if wait {
        pm.result = make(chan error, 1)
    }
    select {
    case ch <- pm:
        if !wait {
            return nil
        }
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
    select {
    case err := <-pm.result:
        if err != nil {
            return err
        }
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
    return nil
}

stepWithWaitOption では特に何もしていないように一見みえるのですが

ch := n.propc
    pm := msgWithResult{m: m}
    if wait {
        pm.result = make(chan error, 1)
    }
    select {
    case ch <- pm:
        if !wait {
            return nil
        }

このチャネルによる書き込みで別goroutineでリッスンしているハンドラーが起動します.

具体的にはここからraftのコードに移行していきます

func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready

    r := n.rn.raft

    lead := None

    for {
  
                 // 中略


             select {
        // TODO: maybe buffer the config propose if there exists one (the way
        // described in raft dissertation)
        // Currently it is dropped in Step silently.
        case pm := <-propc:
            log.Println("Debug:  execute propc")
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }

先ほどmessageを書き込んだchannelをここで読み込み r.Step 関数に渡しているのがわかります. Step関数も非常に長いので省略しますが, Raft内で使用される論理的な時間のtermのチェックをした後, 問題なければさらに内部のstep関数へ処理を渡します

func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {

         // 中略

         default:
        if m.Type == pb.MsgProp {
            log.Println("Debug: raft.Step", m.String())
        }
        err := r.step(r, m)
        if err != nil {
            return err
        }
    }
    return nil
}

step関数構造体内で関数ポインタとして定義されており, raftの状態(leader, candidate, follower)により関数の実態が変わります

type raft struct {
     // 省略
     step stepFunc
}

状態が変化するごとに関数を以下のように置き換えています

func (r *raft) becomeFollower(term uint64, lead uint64) {
    r.step = stepFollower
    r.reset(term)
    r.tick = r.tickElection
    r.lead = lead
    r.state = StateFollower
    r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

func (r *raft) becomeCandidate() {
    // TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateLeader {
        panic("invalid transition [leader -> candidate]")
    }
    r.step = stepCandidate
    r.reset(r.Term + 1)
    r.tick = r.tickElection
    r.Vote = r.id
    r.state = StateCandidate
    r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

func (r *raft) becomeLeader() {
    // TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateFollower {
        panic("invalid transition [follower -> leader]")
    }
    r.step = stepLeader
    r.reset(r.Term)
    r.tick = r.tickHeartbeat
    r.lead = r.id
    r.state = StateLeader

        // 省略
    r.reduceUncommittedSize([]pb.Entry{emptyEnt})
    r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

今回はetcdをシングルノード, leaderで動かしているのでstep関数はstepLeader関数となります. これでようやくそれっぽいメッセージの実処理を担っていそうなstepLeader関数にたどり着きました

func stepLeader(r *raft, m pb.Message) error {
    // These message types do not require any progress for m.From.
    switch m.Type {

         // 省略

    case pb.MsgProp:
        log.Println("Debug: leader step function")
        if len(m.Entries) == 0 {
            r.logger.Panicf("%x stepped empty MsgProp", r.id)
        }
        if r.prs.Progress[r.id] == nil {
            // If we are not currently a member of the range (i.e. this node
            // was removed from the configuration while serving as leader),
            // drop any new proposals.
            return ErrProposalDropped
        }
        if r.leadTransferee != None {
            r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
            return ErrProposalDropped
        }

        for i := range m.Entries {
            e := &m.Entries[i]
            var cc pb.ConfChangeI
            if e.Type == pb.EntryConfChange {
                var ccc pb.ConfChange
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            } else if e.Type == pb.EntryConfChangeV2 {
                var ccc pb.ConfChangeV2
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            }
            if cc != nil {
                alreadyPending := r.pendingConfIndex > r.raftLog.applied
                alreadyJoint := len(r.prs.Config.Voters[1]) > 0
                wantsLeaveJoint := len(cc.AsV2().Changes) == 0

                var refused string
                if alreadyPending {
                    refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
                } else if alreadyJoint && !wantsLeaveJoint {
                    refused = "must transition out of joint config first"
                } else if !alreadyJoint && wantsLeaveJoint {
                    refused = "not in joint state; refusing empty conf change"
                }

                if refused != "" {
                    r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
                    m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
                } else {
                    r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
                }
            }
        }

        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        r.bcastAppend()
        return nil

送信したメッセージの種別はデバッグしたところpb.MsgProp だったので上記のcase文内に移動します. 注目してほしいのは関数後部の以下の処理です.

if !r.appendEntry(m.Entries...) {
    return ErrProposalDropped
}
r.bcastAppend()
return nil

Raftは論文内でリクエストの合意をチェック後, 処理するAPIとしてAppendEntryというインタフェースを定義しています. ということでおそらくここでRaft内におけるログの追加を行なっていると考察してappendEntryをみてみます

func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
    li := r.raftLog.lastIndex()
    for i := range es {
        es[i].Term = r.Term
        es[i].Index = li + 1 + uint64(i)
    }
    // Track the size of this uncommitted proposal.
    if !r.increaseUncommittedSize(es) {
        r.logger.Debugf(
            "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
            r.id,
        )
        // Drop the proposal.
        return false
    }
    // use latest "last" index after truncate/append
    li = r.raftLog.append(es...)
    r.prs.Progress[r.id].MaybeUpdate(li)
    // Regardless of maybeCommit's return, our caller will call bcastAppend.
    r.maybeCommit()
    return true
}

ビンゴでした. Raftはデータの整合性を保つためにWALを行います. さらに, 処理したログのインデックスを保存し, リーダーが変わった時などに照合することで自分が本当に正しいデータの状況かどうかをチェックすることができます. このappendEntryでログを追加し, 暫定的なcommit状況にした後にbcastAppend でfollower全体に処理を通知, 合意されたならばmaybeCommitから永続的なcommitに変更されます.

これでPutにおけるキーの追加が完了されました.

まとめ

最後の実装周りが駆け足になってしまい申し訳ありません. 本記事では, NoSQLの紹介, ついでにNewSQLの紹介, そしてNoSQLの一種であるetcdについて述べました.

分散KVSは分散システムのもっともディープでベアメタル?な感じがして調べていて非常に面白いです. この記事で少しでも興味をもっていただけたら嬉しいです.

参考