SWEet

A Software Engineer Is Eating Technologies

About etcd and NoSQL

はじめに

本記事は

adventar.org における19日目の記事になります.

Previous ICCV2019 Best paper SinGANのできる事できない事[実践編] - Qiita

Next @Shagamii による記事

となっていますのでよろしければこちらもどうぞ.

さて, 今回は昨今多く世に出てきているNoSQL(NewSQL)とそのうちの一つであるetcdについて述べます. etcdに関してよく知らないという方に向けて参考になれば幸いです.

NoSQL, NewSQLについて

NoSQLとは

aws.amazon.com

NoSQLの"No"の意味には様々な解釈がありますが, 僕自身では "Not Only SQL" だと考えています. 所謂, MySQLPostgreSQLのようなリレーショナルなデータ構造を持たないDBを指しています.

多くのWebソフトウェアでセッションのキャッシュなどで用いられているredisも代表的なNoSQLDBでしょう. redisやmemcachedなどは一般的にKVS(Key Value Store)と呼ばれており, シンプルなデータ構造とAPIが特徴です. KVSはプログラミング言語で言えばmapの構造と似ています.

map[string]string{
  "key_1": "value1",
  "key_2": "value2",
}

KVSはインメモリで実装されていることが多く, 読み書きが非常に高速に行えます. NoSQL = KVSとしている記事なども多いですが必ずしもそうではありません.

例えばMongoDBなどはKVSではなくドキュメントベースのDBになります. 仮に, ユーザーの次のようなプロフィールをリレーショナルDBで表現する場合,

Profile:
  - Name: "Katsuya Kawabe"
  - Summary:
    "Student of Shibaura Institute of Technology"
  - Education:
    - Bachelor student of SIT, 2014-2018
    - Master student of SIT, 2018-2020
  - Contact Info:
     - Twitter: @KKawabe108

テーブルを正規化したなら userテーブル, Universityテーブル, Contact Infoテーブルのように分けて, クエリを実行する際には各テーブルのIDを辿っていきアプリケーションに返すといった手法が一般的でしょう.

ドキュメントベースのDBのMongoDBではこれらの情報を一つのjsonを一行単位のデータとして保持します. こうすることでユーザーのデータをフェッチする際には一つのクエリで済みます.

また, 多くのドキュメントDBは他の行と関連性を持たせることが少ないことを想定しているのでRDBのようなリレーションを保持せず、スキーマレスなストレージとして活用できることがメリットであるとも言われています. 正確にはスキーマレスではなく, スキーマオンリード と呼ばれる読み取りの時に構造が解釈されるものです.

そのメリットとして, 例えば古い行のデータは名前が姓と名に分けていませんが何らかの事情で分けたくなったとします. アプリケーションのコードは次のようになるかもしれません

if (user != null && user.name && !user.first_name) {
  // 古いデータはfirst_nameが存在しない
  user.first_name = user.name.split(" ")[0];
}

コードはこれで良いですし, ドキュメントDBであれば追加するデータのスキーマを変更するだけでストレージ上の既存のデータには特段処理を加える必要はありません. RDBであれば ALTER TABLE ... のようにマイグレーション処理を行わなければならないでしょう.

昨今なメジャーなドキュメントDBでは

  • MongoDB
  • RethinkDB
  • CouchDB
  • Espresso

などがあります. 他にもGraphベースのDBなど調べれば沢山のNoSQL DBが存在しますがここでは割愛します.

NewSQL

NewSQLとは簡単に言えば NoSQL + SQL(With Transaction) といういいとこ取りなDBです NewSQL登場の背景としてはRDBがそのリレーショナルなスキーマに縛られたデータの保持方法からスケールアウトしづらい構造でした. ただ, 最近(5~10年前から)はビッグデータの流行によって, 高負荷なread and writeを処理できず, NoSQLが登場しました.

しかし, 人間は強欲なのでNoSQLにトランザクション機能まで欲しくなりました. そうした結果, 水平オートスケール, SQL機能を持ったNewSQLが登場したわけです.

有名なNewSQLの製品だと

  • CockroachDB
  • VoltDB
  • TiDB
  • Spanner(GCP)
  • F1(Google)

等が挙げられます. そもそも, NoSQLは行間のリレーショナルが存在せず, トランザクションが行われず行をロックしないので高速だったわけですが, 昨今様々なWebソフトウェアでも使用されているSpannerはこのトランザクションにおけるロック機構, データ構造を工夫することによって高速かつ水平オートスケール可能なNewSQLを実現しています.

その技術の変態度合いについては各所の記事で紹介されているのでそちらをご覧ください

また, CockroachDB, TiDBに関してはGoogle F1, Spannerの論文から着想を得たオープンソースなNewSQLで いわゆる HTAP(Hybrid Transactional and Analytical Processing) をサポートするデータベースです. 有名なAWSにおけるAuroraの代替と位置付けるものでもあります.

これらはまだ大きなWebサービスで使用されているという話は聞いていませんが大きく期待できるプロダクトだと勝手に思っています.

etcd

さて, ようやくタイトルにある通り, etcdの話をします.

etcdは先述したNoSQLでありKVSなDBです.

etcd.io

また, 紹介では分散KVSとも呼ばれることが多いです. 複数のキーにまたがる強力なトランザクションはサポートしていなかった気がするのでNewSQLではありません.

KVSはredisを始め, 有名なプロダクトが他にもたくさんあるわけですが, なぜetcdは開発されたのでしょうか

誕生の背景

その理由はシンプルで, 単純にプロダクトレディな高信頼な分散KVSがなかったから と考察します. redisやmemcachedはそもそもクラスタレベルで運用されるのが珍しく, クラスタ間の整合性も高速化のために結果整合性 という最終的に結果がいつか反映されればOKというスタンスで実装されています.

例えば図のようにredisのマスターにx=1という書き込みを行い, B, Cのスレーブには問題なくレプリケーションを行えても, スレーブAに反映する前にマスターがダウンし, スレーブAがマスターに昇格してしまった場合, マスタ-スレーブ間でのデータ不整合に繋がってしまいます. f:id:kk_river108:20191218220617p:plain

etcdではこういったクラスタによる運用を前提とし, なおかつノード間での強力なデータ一貫性である 強整合性 を担保することを目的としています. 強整合性は一般的にトレードオフとして, パフォーマンスが低くなったり, ネットワークの切断といったシステムフォールトに対する耐性が低くなったりといった問題がありますが, etcdはいくつかの工夫によってそのリスクを抑えています.

特徴

使用する際に確認できる主な特徴は以下の点です

  • データの読み書きをHTTPベースのリクエストで行える
  • KVSなので当然書き込み速度, 読み込む速度は高速
  • Raft(後述します)を用いたノード間でのデータ一貫性を担保

ユースケース

etcd.io のホームページによると Rook(Kubernetes nativeなストレージエンジン), CoreDNS, OpenStack などに用いられています. それぞれのプロダクト内でノード間のコンフィグ(他のノードのIDを鍵としてIPを解決する)ようなコンフィギュレーションを保持するのに用いられているのが多い印象です. HadoopにおけるZookeeperのような役割を想像していただけるとわかりやすいのではないでしょうか.

Raft

etcdをはじめとした昨今有名な分散KVS(TiKVなど)はバックエンドのコアロジックとしてRaftを使用していることが多いです. 説明しようと思ったのですが時間がなかったので論文とわかりやすい資料を貼っておきますので許してください.

raft.github.io このサイトの raft paperという項目から論文に飛べます

www.slideshare.net

上記のスライドにもありますが, ざっくばらんに言うと「リーダーとレプリケーション間で合意が取れた処理は全てのノードで結果が保証される」というものです. PaxosというGoogle内のChubbyやZookeeperなどにも使われている分散合意アルゴリズムが難しすぎたことによりRaftが生まれました.

Raftを実装することでKVSへの書き込み処理を全てのノードに伝搬させることができ, ネットワークの分断, ノードの唐突なダウンにも耐性がつきます.

実装

せっかくなのでRaftにキーを書き込むまでに何が起こっているのかをコードレベルで追ってみます. etcdはGoで実装されています. その理由としては

etcd is written in Go, which has excellent cross-platform support, small binaries and a great community behind it. Communication between etcd machines is handled via the Raft consensus algorithm.

といった理由だそうです.

それではetcdを起動して以下のクライアントツールを用いてキーを書き込みます etcdをビルドする際にはgit cloneしてルート直下で ./build で完了します.

ビルドしたバイナリを起動するには

$ ./bin/etcd 

で良いです

クライアントツールも同時にビルドされるので以下のコマンドで今回は実験しました.

 $ ./bin/etcdctl put mykey "hoge"

これにより, 内部でetcdのサーバを叩いてキーの登録を行ます. etcdのサーバはgRPCによるAPIを提供していてPutコマンドによるエントリーポイントが次のコードになります

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.PutResponse), nil
}

etcd/v3_server.go at master · etcd-io/etcd · GitHub

内部で s.raftRequest を呼び出してマスター, レプリケーション間でのデータの処理を行ます. その後, いくつかの関数を経由して processInternalRaftRequestOnce => Propose=> stepWait => stepWithWaitOption で関数の呼び出しが止まります.

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
    ai := s.getAppliedIndex()
    ci := s.getCommittedIndex()
    if ci > ai+maxGapBetweenApplyAndCommitIndex {
        return nil, ErrTooManyRequests
    }

    r.Header = &pb.RequestHeader{
        ID: s.reqIDGen.Next(),
    }

    authInfo, err := s.AuthInfoFromCtx(ctx)
    if err != nil {
        return nil, err
    }
    if authInfo != nil {
        r.Header.Username = authInfo.Username
        r.Header.AuthRevision = authInfo.Revision
    }

    data, err := r.Marshal()
    if err != nil {
        return nil, err
    }

    if len(data) > int(s.Cfg.MaxRequestBytes) {
        return nil, ErrRequestTooLarge
    }

    id := r.ID
    if id == 0 {
        id = r.Header.ID
    }
    ch := s.w.Register(id)

    cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
    defer cancel()

    start := time.Now()
    err = s.r.Propose(cctx, data)
         
        // 以下略
}
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
    if m.Type != pb.MsgProp {
        select {
        case n.recvc <- m:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        case <-n.done:
            return ErrStopped
        }
    }
    ch := n.propc
    pm := msgWithResult{m: m}
    if wait {
        pm.result = make(chan error, 1)
    }
    select {
    case ch <- pm:
        if !wait {
            return nil
        }
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
    select {
    case err := <-pm.result:
        if err != nil {
            return err
        }
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
    return nil
}

stepWithWaitOption では特に何もしていないように一見みえるのですが

ch := n.propc
    pm := msgWithResult{m: m}
    if wait {
        pm.result = make(chan error, 1)
    }
    select {
    case ch <- pm:
        if !wait {
            return nil
        }

このチャネルによる書き込みで別goroutineでリッスンしているハンドラーが起動します.

具体的にはここからraftのコードに移行していきます

func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready

    r := n.rn.raft

    lead := None

    for {
  
                 // 中略


             select {
        // TODO: maybe buffer the config propose if there exists one (the way
        // described in raft dissertation)
        // Currently it is dropped in Step silently.
        case pm := <-propc:
            log.Println("Debug:  execute propc")
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }

先ほどmessageを書き込んだchannelをここで読み込み r.Step 関数に渡しているのがわかります. Step関数も非常に長いので省略しますが, Raft内で使用される論理的な時間のtermのチェックをした後, 問題なければさらに内部のstep関数へ処理を渡します

func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {

         // 中略

         default:
        if m.Type == pb.MsgProp {
            log.Println("Debug: raft.Step", m.String())
        }
        err := r.step(r, m)
        if err != nil {
            return err
        }
    }
    return nil
}

step関数構造体内で関数ポインタとして定義されており, raftの状態(leader, candidate, follower)により関数の実態が変わります

type raft struct {
     // 省略
     step stepFunc
}

状態が変化するごとに関数を以下のように置き換えています

func (r *raft) becomeFollower(term uint64, lead uint64) {
    r.step = stepFollower
    r.reset(term)
    r.tick = r.tickElection
    r.lead = lead
    r.state = StateFollower
    r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

func (r *raft) becomeCandidate() {
    // TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateLeader {
        panic("invalid transition [leader -> candidate]")
    }
    r.step = stepCandidate
    r.reset(r.Term + 1)
    r.tick = r.tickElection
    r.Vote = r.id
    r.state = StateCandidate
    r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

func (r *raft) becomeLeader() {
    // TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateFollower {
        panic("invalid transition [follower -> leader]")
    }
    r.step = stepLeader
    r.reset(r.Term)
    r.tick = r.tickHeartbeat
    r.lead = r.id
    r.state = StateLeader

        // 省略
    r.reduceUncommittedSize([]pb.Entry{emptyEnt})
    r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

今回はetcdをシングルノード, leaderで動かしているのでstep関数はstepLeader関数となります. これでようやくそれっぽいメッセージの実処理を担っていそうなstepLeader関数にたどり着きました

func stepLeader(r *raft, m pb.Message) error {
    // These message types do not require any progress for m.From.
    switch m.Type {

         // 省略

    case pb.MsgProp:
        log.Println("Debug: leader step function")
        if len(m.Entries) == 0 {
            r.logger.Panicf("%x stepped empty MsgProp", r.id)
        }
        if r.prs.Progress[r.id] == nil {
            // If we are not currently a member of the range (i.e. this node
            // was removed from the configuration while serving as leader),
            // drop any new proposals.
            return ErrProposalDropped
        }
        if r.leadTransferee != None {
            r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
            return ErrProposalDropped
        }

        for i := range m.Entries {
            e := &m.Entries[i]
            var cc pb.ConfChangeI
            if e.Type == pb.EntryConfChange {
                var ccc pb.ConfChange
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            } else if e.Type == pb.EntryConfChangeV2 {
                var ccc pb.ConfChangeV2
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            }
            if cc != nil {
                alreadyPending := r.pendingConfIndex > r.raftLog.applied
                alreadyJoint := len(r.prs.Config.Voters[1]) > 0
                wantsLeaveJoint := len(cc.AsV2().Changes) == 0

                var refused string
                if alreadyPending {
                    refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
                } else if alreadyJoint && !wantsLeaveJoint {
                    refused = "must transition out of joint config first"
                } else if !alreadyJoint && wantsLeaveJoint {
                    refused = "not in joint state; refusing empty conf change"
                }

                if refused != "" {
                    r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
                    m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
                } else {
                    r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
                }
            }
        }

        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        r.bcastAppend()
        return nil

送信したメッセージの種別はデバッグしたところpb.MsgProp だったので上記のcase文内に移動します. 注目してほしいのは関数後部の以下の処理です.

if !r.appendEntry(m.Entries...) {
    return ErrProposalDropped
}
r.bcastAppend()
return nil

Raftは論文内でリクエストの合意をチェック後, 処理するAPIとしてAppendEntryというインタフェースを定義しています. ということでおそらくここでRaft内におけるログの追加を行なっていると考察してappendEntryをみてみます

func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
    li := r.raftLog.lastIndex()
    for i := range es {
        es[i].Term = r.Term
        es[i].Index = li + 1 + uint64(i)
    }
    // Track the size of this uncommitted proposal.
    if !r.increaseUncommittedSize(es) {
        r.logger.Debugf(
            "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
            r.id,
        )
        // Drop the proposal.
        return false
    }
    // use latest "last" index after truncate/append
    li = r.raftLog.append(es...)
    r.prs.Progress[r.id].MaybeUpdate(li)
    // Regardless of maybeCommit's return, our caller will call bcastAppend.
    r.maybeCommit()
    return true
}

ビンゴでした. Raftはデータの整合性を保つためにWALを行います. さらに, 処理したログのインデックスを保存し, リーダーが変わった時などに照合することで自分が本当に正しいデータの状況かどうかをチェックすることができます. このappendEntryでログを追加し, 暫定的なcommit状況にした後にbcastAppend でfollower全体に処理を通知, 合意されたならばmaybeCommitから永続的なcommitに変更されます.

これでPutにおけるキーの追加が完了されました.

まとめ

最後の実装周りが駆け足になってしまい申し訳ありません. 本記事では, NoSQLの紹介, ついでにNewSQLの紹介, そしてNoSQLの一種であるetcdについて述べました.

分散KVSは分散システムのもっともディープでベアメタル?な感じがして調べていて非常に面白いです. この記事で少しでも興味をもっていただけたら嬉しいです.

参考