RDBMSの先を行く?NewSQLを支えるアルゴリズムRaftをGoで紐解く

f:id:kaminashi-developer:20210226020850p:plain

初めまして。株式会社カミナシPMの@gtongy1です。

みなさんはNewSQLをご存知ですか?
強い整合性を持つ分散型のSQLデータベースサービスのことをNewSQLと呼びます。
RDBMSではなし得なかった分散アーキテクチャを、またNoSQLではなし得なかった強い整合性をいいとこ取りした新しいSQLデータベースサービスです。
なんかとても理想的な仕組みに見えますね。この裏にはどのような知識が詰め込まれているのでしょうか。

今回はそんなNewSQLを支える仕組みを一緒に紐解いていきましょう!

NewSQLが乗り越えた壁

どんな仕組みが動いているのか、の前にNewSQLはこのSQLデータベース界へ何を投げ込んだのでしょうか。
NewSQLには以下のような特徴があります。

NewSQLの有名所であるCockroachDBでいうと、この仕組みを以下のようなArchitectureで実装をしています。
各階層ごとでうまく役割分担しているのが見て取れます。

f:id:kaminashi-developer:20210224090822p:plain
Cockroach Architecture

github.com

分散されたKVS内で複数のNodeとStoreで領域分けられていることが分かります。
では、このStore中身はどうなっているのでしょうか。中を見てみましょう。

f:id:kaminashi-developer:20210224230124p:plain

このように、Storeの内部には複数のRangeと呼ばれる領域を含んでおり、この色分けされている中で共通な部分はレプリカとして生成されています。
つまり、Store間を跨いだ状態で 同一Rangeの関連を持つことが出来るのです

  • NoSQLでは実現不可能だったACIDサポートのトランザクション/SQL-Likeなクエリ言語を記述出来なかった心労を乗り越えていること
  • またRDBMSでは苦労するShardingの仕組みを、分散システムとして内包しマネージドな形でのサービスの提供を行っていること

がNewSQLがこれまでの仕組みを乗り越えた壁であることがわかります。

では一体、どのような仕組みを使いレプリカ間でのデータの関連性を担保しているのでしょうか?
その謎に迫るためにはRaftについて説明していくことにしましょう!

なぜRaftが使われるのか

詳細を話す前に、なぜRaftが使われてきたのでしょうか。その実体に迫ります。
分散システムを実際に設計していくことになると、複数のノードに分散されたアプリを協調する必要があります。
例えば、どのノードで更新がかかったのか、その更新の値はどんなものなのか、そして今その値は更新しても良いものなのか等。
分散されているが故に、他のノードの状態がわかりづらくなってしまいます。
そこでノード間の関係に一貫性を保つアルゴリズムこそがRaftです。

thesecretlivesofdata.com

Raft登場以前はPaxosと呼ばれるアルゴリズムが主流でした。 このアルゴリズム自体実装がとにかく難しい(らしく)、Raftが流行っていったそうです。
なぜそんなに差が生まれたのでしょうか?
実はRaftのアルゴリズム

のこの2つが最低機能。つまりこれだけを覚えると実装出来るんです!とてもシンプルですね。
そんなRaftのコードの中身気になって来ましたか?
OKです!順を追ってコードを使いながら説明していきますね。

リーダー選出(Leader Election)

gyazo.com

上記のgifをご覧ください。図を見てどのような変化が見られるでしょうか?

  • S1~S5のノードが存在し、そのノードそれぞれがタイマーを持っている
  • 最もタイマーが早く終わったS1が、他のノードに対してメッセージを送信している
  • メッセージを受け取ったS2~S5がレスポンスを返却し、S1へ返している
  • S2~S5から3つのメッセージを受け取ったS1はステータスが変わっている
  • S1の色は変化し、S1からメッセージを送っている

このような流れがあったのではないでしょうか?これこそがリーダー選出(Leader Election)の動作になります。
まず上記の流れを説明するために、ノードのステータスに対して説明していく必要があります。
Raftにおいて、ノードは以下の3つのステータスが動的に切り替わっています。

  • Leader
  • Candidate
  • Follower

f:id:kaminashi-developer:20210225204656p:plain

func NewRaft() (*Raft, error) {
    // ...
    go r.run()
    return r, nil
}

func (r *Raft) run() {
    for {
        select {
        switch r.getState() {
        case Follower: r.runFollower()
        case Candidate: r.runCandidate()
        case Leader: r.runLeader()
        }
    }
}

func (r *Raft) runFollower() {
    for {
        select {
        // 時間切れでcandidateへstateの変更
        case <-randomTimeout(r.config.HeartbeatTimeout):
            logrus.Info("heartbeat timeout reached, starting election")
          // Candidateへの昇格
            r.setState(Candidate)
            return
    }
}
func (r *Raft) runCandidate() {
    // 投票リクエストの送信、channelの作成
    voteCh := r.electSelf()
    grantedVotes := 0
    votesNeeded := r.quorumSize()
    for {
        select {
      // 投票リクエストの受け取り
        case vote := <-voteCh:
            // ...
            // grantedな投票の票数確認
            if vote.VoteGranted {
                grantedVotes++
                logrus.Printf("vote granted. tally: %d", grantedVotes)
            }

            // 過半数表を得ているか。leaderになり得るか
            if grantedVotes >= votesNeeded {
                logrus.Printf("election won. tally: %d", grantedVotes)
              // Leaderへの昇格
                r.setState(Leader)
                return
            }
            // ...
    }
}

上記実装の中では、

  1. 最初はどのノードもFollowerから開始
  2. 初回Followerから開始し、各node内で設定されているタイマー(HeartbeatTimeout)で設定された時間が経ったのちCandidateへ状態遷移
  3. その後各ノードから投票を受け(RequestVoteRPC)、この結果で投票が過半数を獲得したノードに対してLeaderへ状態遷移

の流れによって、リーダーの選出を行うことが出来ます。 goroutineのchannelを利用して1プロセス: 1ノードで表しています。

ログ複製(Log Replication)

gyazo.com

Raftはクライアントからの値の変更に対して、Leaderを介して更新を行います。
またその受け取った値に対して、Leaderは各Followerに対し変更の合意をとる必要があります。
この時のコマンドをログと呼び、このコマンド自体は永続化することにより、状態を保存しておきます。

// クライアントからのログの書き込み
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
    // ...
    logFuture := &logFuture{
        log: Log{
            Type: LogCommand,
            Data: cmd,
        },
        errCh: make(chan error, 1),
    }
    select {
    case r.applyCh <- logFuture:
        return logFuture
    }
}

func (r *Raft) runLeader() {
    // Leaderから各ノードに対して複製の実行
    for i, peer := range r.peers {
        go r.replicate(inflight, triggers[i], stopCh, peer)
    }
}

func (r *Raft) replicate(inflight *inflight, triggerCh, stopCh chan struct{}, peer net.Addr) {
    last := r.getLastLog()
    // Followerの対象Indexと次のIndexを取得
    indexes := followerReplication{
        matchIndex: last,
        nextIndex:  last + 1,
    }
    shouldStop := false
    for !shouldStop {
        select {

        case <-triggerCh:
            // 書き込みの実行
            shouldStop = r.replicateTo(inflight, &indexes, r.getLastLog(), peer)
        }
    }
}
func (r *Raft) replicateTo(inflight *inflight, indexes *followerReplication, lastIndex uint64, peer net.Addr) (shouldStop bool) {
    req = AppendEntriesRequest{
        Term:         r.getCurrentTerm(),
        LeaderID:     r.CandidateID(),
        LeaderCommit: 0,
    }
    // RPCの送信
    if err := r.trans.AppendEntries(peer, &req, &resp); err != nil {
        logrus.Printf("fail to AppendEntries to %v: %v", peer, err)
        return
    }
    // 送信成功時に永続化層の書き換え(Commit)
    if resp.Success {
        for i := indexes.matchIndex; i <= maxIndex; i++ {
            inflight.Commit(i)
        }
        indexes.matchIndex = maxIndex
        indexes.nextIndex = maxIndex + 1
    }
    return
}

ログ複製は以下の手順を踏み、値の更新を行っていきます。

  1. Leaderはログにコマンドを追加し、そのコマンドでAppendEntries RPCを送信
  2. 各ノードはローカルにエントリを追加し、成功すると返信
  3. Followerの過半数がログエントリのローカルコミットに成功すると、Leaderはコマンドをコミット。この時に成功応答をクライアントに返信
  4. 次のLeaderからのリクエストでは、更新されたコミットインデックスをすべてのFollowerに複製
  5. Leaderがエントリをコミットすると、現在のログインデックスより前のすべてのエントリもコミット

ログを複製するタイミングには、LeaderはFollowerに対し2回の合意を行う(最初にログへの書き込み、2回目には変更の合意)ことによって、更新を完了することが出来ます。

結論

この二つの仕組みを利用することにより

  • リーダーの選出を行うことにより、分散された各ノードに対して自動的に役割が選出されていく
    • 一つのノードが障害が起こった場合においても、再度投票が自動的に行われていくことによって、分散で散らばった各ノードに役割がつけられる
  • ノードに対しての変更はLeaderを介して、各Followerノードに通知される
    • その結果はLeaderは送信された結果に対して、またFollowerは受け取った結果に対しての結果を返却するのみ
    • そのため一つのノードの障害に依存しない形を実現可能

が分かります。
つまりRaftを使うことによりレプリカ間でのデータを、分散されたノード間で関連付けられることが分かりました!

まとめと所感

今回はNewSQLで利用されているRaftの技術を深堀りしてみました。
学習レベルですが、Raftの技術を実装から追いかけてみることにより、仕組みが一つ腹落ち出来た気がします。
そして、2つの仕組みを学ぶと全体の動きを見れるのはとても良いですね。
Raftを知るためには、このサイトをまず見てみてください。

thesecretlivesofdata.com

とてもグラフィカルに分かりやすく作られていて、雰囲気は掴みやすかったです。

所感として、Raftを進めて行く中で改めてNewSQLに対していろいろと疑問に感じた部分があります。

  • Leaderノードに対する書きこみ遅延が発生しそう
    • リーダーに選出されるノードは一つ。ここをSPOFにならないのだろうか?
      • 最初に出した図のように、Rangeの範囲でシャーディングを行っている
      • そのため、アプリケーションレベルで考えるとRangeに対する分割単位をどのように行うかがスケールさせる肝になってきたりするのかな?
  • Raftとは関係ないけど、SQLレイヤーで作成しているコードはどんな感じになってるんだろう
  • NewSQLはRDBMSでいうB-Tree構造とは違うと思うんだけど、何があるの?
    • LSM(Log-structured merge-tree)たるものがあるらしい。記事もいろいろある。この高速化はどう行うのかとか、仕組みはいまいちわかっていない。

など、Raftを知ることは、NewSQLを知るにはまだ氷山の一角なのだなと感じました。
これから他の技術を触れ合ってみたいですね。

最終的なコードはこちらに載せています。詳細気になったらご覧ください!

github.com

最後に宣伝です。

カミナシでは今動いている技術の選択肢だけじゃなくて、他にも事業が前にすすむような技術的なチャレンジを出来る企業です。
柔軟なインフラとサーバーを構築し、事業全体を前に進めてくれるようなSREの方や、エンジニアを募集しています!

エントリーお待ちしております!!

open.talentio.com open.talentio.com

参考文献