jackc/pgxのErrBadConnリトライ・target_session_attrs

プラットフォームチームの菅原です。

GolangPostgreSQLドライバ jackc/pgxについて最近まで知らなかった機能があったので紹介します。

driver.ErrBadConnでのリトライ

データベースの再起動などで切断されたコネクションをコネクションプールから引き当ててエラーになる問題について、SetConnMaxLifetime()を設定して、定期的にコネクションをリフレッシュするしかないと思っていたのですが、こちらの記事でdriver.ErrBadConnのときにリトライしてくれることを知りました。

たしかにドキュメントには

ErrBadConn should be returned by a driver to signal to the database/sql package that a driver.Conn is in a bad state (such as the server having earlier closed the connection) and the database/sql package should retry on a new connection.

と書いてあり、database/sqlのコードを読むとfunc (db *DB) retry()でリトライ処理を行っています。

// go/src/database/sql/sql.go

// maxBadConnRetries is the number of maximum retries if the driver returns
// driver.ErrBadConn to signal a broken connection before forcing a new
// connection to be opened.
const maxBadConnRetries = 2

func (db *DB) retry(fn func(strategy connReuseStrategy) error) error {
    for i := int64(0); i < maxBadConnRetries; i++ {
        err := fn(cachedOrNewConn)
        // retry if err is driver.ErrBadConn
        if err == nil || !errors.Is(err, driver.ErrBadConn) {
            return err
        }
    }

    return fn(alwaysNewConn)
}

pgxのコードではSafeToRetry()がtrueを返すときにErrBadConnを返していました。特定のエラーや、エラー発生時にデータ送信がなかった場合などにリトライが許可されるようです。

cf. https://github.com/search?q=repo%3Ajackc%2Fpgx%20SafeToRetry&type=code

// pgx/stdlib/sql.go

    if err != nil {
        if pgconn.SafeToRetry(err) {
            return nil, driver.ErrBadConn
        }
        return nil, err
    }

以下のコードで動作を確認してみました。

package main

import (
    "database/sql"
    "fmt"
    "net/url"
    "time"

    _ "github.com/jackc/pgx/v5/stdlib"
    "github.com/mattn/go-tty"
)

func main() {
    url := &url.URL{
        Scheme: "postgres",
        User:   url.UserPassword("postgres", "xxx"),
        Host:   "xxx.ap-northeast-1.rds.amazonaws.com:5432",
        Path:   "postgres",
    }

    db, err := sql.Open("pgx", url.String())

    if err != nil {
        panic(err)
    }

    defer db.Close()
    db.SetConnMaxLifetime(0)
    db.SetConnMaxIdleTime(0)
    db.SetMaxIdleConns(1)
    db.SetMaxOpenConns(1)

    tty, err := tty.Open()

    if err != nil {
        panic(err)
    }

    defer tty.Close()

    for {
        // キー入力を待つ
        tty.ReadRune()

        var n int
        err = db.QueryRow("select 1").Scan(&n)

        if err != nil {
            fmt.Println(err)
            continue
        }

        fmt.Printf("select 1 => %d\n", n)
    }
}
// database/sql/sql.go

func (db *DB) retry(fn func(strategy connReuseStrategy) error) error {
    for i := int64(0); i < maxBadConnRetries; i++ {
        err := fn(cachedOrNewConn)
        // retry if err is driver.ErrBadConn
        if err == nil || !errors.Is(err, driver.ErrBadConn) {
            return err
        }
        // リトライ時の出力を追加
        fmt.Printf("[INFO] retried with error: %s\n", err)
    }

    return fn(alwaysNewConn)
}

キー入力でselect 1を実行しながら途中でデータベースを再起動してみると、リトライされていることが確認できました。

select 1 => 1
select 1 => 1
select 1 => 1
# ここでデータベースを再起動
[INFO] retried with error: driver: bad connection
select 1 => 1

target_session_attrs

こちらは別のブログ記事で知ったのですが、go-sql-driver/mysqlにはrejectReadOnlyというパラメーターがあり、Auroraがフェイルオーバーした際に降格したreaderノードに書き込みを行う問題を回避できるようになっていました。

pgxでも同様の機能がないか調べたところtarget_session_attrsというパラメーターで接続するノードの種別を指定できるようになっていました。

cf. https://github.com/jackc/pgx/blob/70f7cad2226dc12406b105f8bb5be9c62780aaf7/pgconn/config.go#L402-L417

   switch tsa := settings["target_session_attrs"]; tsa {
    case "read-write":
        config.ValidateConnect = ValidateConnectTargetSessionAttrsReadWrite
    case "read-only":
        config.ValidateConnect = ValidateConnectTargetSessionAttrsReadOnly
    case "primary":
        config.ValidateConnect = ValidateConnectTargetSessionAttrsPrimary
    case "standby":
        config.ValidateConnect = ValidateConnectTargetSessionAttrsStandby
    case "prefer-standby":
        config.ValidateConnect = ValidateConnectTargetSessionAttrsPreferStandby
    case "any":
        // do nothing
    default:
        return nil, &ParseConfigError{ConnString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", tsa)}
    }

libpqにある機能ですがpgxも独自に実装しているようです。

// ValidateConnectTargetSessionAttrsReadWrite is a ValidateConnectFunc that implements libpq compatible
// target_session_attrs=read-write.
func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error {

以下のコードで動作を確認してみました。

package main

import (
    "database/sql"
    "fmt"
    "net/url"
    "time"

    _ "github.com/jackc/pgx/v5/stdlib"
)

func main() {
    params := url.Values{}
    // params.Add("target_session_attrs", "read-write")

    url := &url.URL{
        Scheme:   "postgres",
        User:     url.UserPassword("postgres", "xxx"),
        Host:     "xxx.ap-northeast-1.rds.amazonaws.com:5432",
        Path:     "postgres",
        RawQuery: params.Encode(),
    }

    db, err := sql.Open("pgx", url.String())

    if err != nil {
        panic(err)
    }

    defer db.Close()
    db.SetConnMaxLifetime(0)
    db.SetConnMaxIdleTime(0)
    db.SetMaxIdleConns(1)
    db.SetMaxOpenConns(1)

    for {
        time.Sleep(1 * time.Second)

        r, err := db.Exec("insert into test values ($1)", time.Now().String())

        if err != nil {
            fmt.Println(err)
            continue
        }

        n, _ := r.RowsAffected()
        fmt.Printf("RowsAffected: %d\n", n)
    }
}

target_session_attrsを設定しないコードを動かしてフェイルオーバーを行うと、切り替え後にERROR: cannot execute INSERT in a read-only transactionが発生しつづけてしまいます。

RowsAffected: 1
RowsAffected: 1
RowsAffected: 1
unexpected EOF
failed to connect to `user=postgres database=postgres`:
    xxx.xxx.xxx.xxx:5432 (xxx.ap-northeast-1.rds.amazonaws.com): dial error: dial tcp xxx.xxx.xxx.xxx:5432: connect: connection refused
    xxx.xxx.xxx.xxx:5432 (xxx.ap-northeast-1.rds.amazonaws.com): dial error: dial tcp xxx.xxx.xxx.xxx:5432: connect: connection refused
...
ERROR: cannot execute INSERT in a read-only transaction (SQLSTATE 25006)
ERROR: cannot execute INSERT in a read-only transaction (SQLSTATE 25006)
ERROR: cannot execute INSERT in a read-only transaction (SQLSTATE 25006)
...

target_session_attrs=read-writeを設定した場合には、切り替え後に検証が行われ書き込み可能なコネクションに接続することを確認できました。

RowsAffected: 1
RowsAffected: 1
RowsAffected: 1
unexpected EOF
failed to connect to `user=postgres database=postgres`:
    xxx.xxx.xxx.xxx:5432 (xxx.ap-northeast-1.rds.amazonaws.com): dial error: dial tcp xxx.xxx.xxx.xxx:5432: connect: connection refused
    xxx.xxx.xxx.xxx:5432 (xxx.ap-northeast-1.rds.amazonaws.com): dial error: dial tcp xxx.xxx.xxx.xxx:5432: connect: connection refused
...
failed to connect to `user=postgres database=postgres`:
    xxx.xxx.xxx.xxx:5432 (xxx.ap-northeast-1.rds.amazonaws.com): ValidateConnect failed: read only connection
    xxx.xxx.xxx.xxx:5432 (xxx.ap-northeast-1.rds.amazonaws.com): ValidateConnect failed: read only connection
RowsAffected: 1
RowsAffected: 1
RowsAffected: 1
...

まとめ

MySQLの話からなんとなく調べてみただけだったのですが有用な機能を知ることができました。

再接続やフェイルオーバー時のノード選択などの機能はライブラリに実装されず自前でライブラリを拡張することも多いのですが、このようにライブラリ側で実装されているとデータベースを運用する立場としてはとてもありがたいです。

自分の知らない機能はまだまだある気はするので、時間のあるときにでもこまごま深掘りできたらと思っています。

Go 1.24 で map が30%以上高速化!Swiss Tableとは?

Go1.24 がついに公開されましたね。その中でとても興味深い改善内容がありました。

簡単にいうと以下のような内容です。

  • map処理が30%+の高速化
  • CockroachDB1のチームで高性能なSwiss Tableを開発

なんとGo 1.24ではMapに関する処理が30%+の高速化しているそうです! それを実現しているSwiss Tableとは何かを少しみてみたいと思います。

初学者が勉強的に読んでいるので、誤った解釈などあれば教えて下さい 🙏

Go の従来の map

Go 1.23 以前の map は、バケットとオーバーフローバケット を備えた従来のハッシュテーブルが使用されていたそうです。

この方式の課題は、キャッシュの非効率性とポインタ追跡に課題があったそうです。

その問題を解決すべく選ばれたのは Swiss Table でした

cockroachdbのswissリポジトリのreadmeを読むと以下のようなドキュメントやライブラリもあるので、興味があれば覗いてみて下さい!

Swiss Table2とは?

高速なオープンアドレス方式3のハッシュテーブル実装であり、ハッシュ値の分割とSIMD(Single Instruction Multiple Data)4 命令の活用により、従来のハッシュテーブルよりも効率的な操作を実現しています。

ハッシュ値の分割:H1とH2

Swiss Tableでは、キーから生成された64ビットのハッシュ値を以下のように分割します。

  • H1(上位57ビット):テーブル内のグループ(バケット)の開始位置を決定するために使用

  • H2(下位7ビット)メタデータとして保存され、キーのハッシュシグネチャとして機能

この分割により、H1はデータの格納場所を特定し、H2はキー比較の前に候補を絞り込むためのフィルタとして機能します。

SIMD(Single Instruction Multiple Data)の活用

SIMDは、単一の命令で複数のデータを同時に処理する技術です。Swiss Tableでは、SIMD命令を使用して、メタデータの複数のバイトを一度に比較することで、検索や挿入操作の効率を大幅に向上させています。

具体的な手順

  1. メタデータの読み取り:H1で特定されたグループのメタデータ(複数のコントロールバイト)をSIMDレジスタに読み込み
  2. H2との比較SIMD命令を使用して、読み込んだメタデータとターゲットのH2を同時に比較
  3. 結果の解析:比較結果から、有効な候補の位置を特定し、実際のキーと値の比較

この方法により、複数のスロットを一度に検査でき、キャッシュの局所性を高め、分岐予測のミスを減らすことで、全体的なパフォーマンスが向上します。

これらの工夫により、Swiss Tableは高い効率性とパフォーマンスを実現しているようです。

CockroachDBのSwiss Table は何が違うのか

では、CockroachDBのSwiss Tableはどう実装されていることで、30%以上のパフォーマンス向上を果たしているのかを見てみましょう。

基本的には、GoogleのSwiss Tableの設計を踏襲しているようですね。

  • オープンアドレス方式とメタデータ配列
    • ここで、各スロットに1バイトのコントロールメタデータ)を持たせ、空・削除済み・使用中を示す点が説明されています。

https://github.com/cockroachdb/swiss/blob/main/map.go#L1468-L1473:embed:lang=go https://github.com/cockroachdb/swiss/blob/main/map.go#L25-L38:embed:lang=go

  • SIMD(Single Instruction Multiple Data)の活用
    • ARM向けのSWAR(SIMD Within A Register)による処理についても記載されており、複数スロットを一括でチェックする仕組みが解説されています。

https://github.com/cockroachdb/swiss/blob/main/map.go#L35-L38:embed:lang=go

  • 探索戦略(プロービング)
    • H1(上位57ビット)を使ってグループの開始位置を決定し、グループ内のコントロールバイト(H2)を調べることで候補を絞り込む方法が説明されています。
    • また、probeSeq 型の実装部分にも、グループごとの線形探索とグループ間の二次探索(quadratic probing)の流れが示されています。

https://github.com/cockroachdb/swiss/blob/main/map.go#L52-L59:embed:lang=go

  • バケットサイズとキャッシュ最適化
    • Google版ではメタデータとスロットが分離しているのに対し、CockroachDB版(この実装)では8個のコントロールバイトと8個のスロットをグループ化することで、同一キャッシュライン内でアクセスできるようにしている点が説明されています。

https://github.com/cockroachdb/swiss/blob/main/map.go#L40-L50:embed:lang=go

  • 削除処理の最適化
    • Tombstone(削除済みのフラグ)を使い、かつ隣接スロットの状態をチェックすることで、プロービングの不整合を防ぎながら効率的に削除を行う方法が述べられています。

https://github.com/cockroachdb/swiss/blob/main/map.go#L61-L69:embed:lang=go

https://github.com/cockroachdb/swiss/blob/main/map.go#L71-L93:embed:lang=go

ざっくりとまとめるとこんな感じでしょうか?

Google Abseil(C++ CockroachDB swiss(Go)
メタデータ管理 メタデータとスロットを分離 メタデータとスロットを同じキャッシュラインに配置
キャッシュ最適化 N-1スロット + N+groupSizeメタデータ 8スロット + 8メタデータでキャッシュ効率を向上
探索アルゴリズム グループ単位の線形探索 線形 + 二次探索を組み合わせたハイブリッド探索
リサイズ バケットを一斉にリサイズ 拡張可能なハッシュで局所的にリサイズ
削除処理 Tombstone利用 Tombstoneに加え、隣接スロットのチェックで最適化
SIMD最適化 x86: SIMD / ARM: SWAR x86: SIMD / ARM: SWAR + メタデータ最適化

今後の改善余地は?

このツイートを読むと、SIMDの使用率の改善、削除処理の向上、サイズ変更戦略の変更でより改善の余地を残しているそうです!


  1. CockroachDB githubの文言を引用しますが、

    CockroachDBは、トランザクション処理と強い一貫性を持つキーバリューストア上に構築された分散型SQLデータベースです。水平スケーリングが可能で、ディスク、マシン、ラック、さらにはデータセンターの障害にも最小限の遅延で対応し、手動の介入を必要としません。強い一貫性を持つACIDトランザクションをサポートし、データの構造化、操作、およびクエリのための馴染みのあるSQL APIを提供します。

    普段、分散型SQLデータベースを作成しているチームがコントリビュートしてくれていたのですね。 そのチームが開発したSwiss Tableが今回の改善に大きく貢献しているようです。

  2. SwissTable: A High-Performance Hash Table Implementationを参考
  3. オープンアドレス
  4. SIMD 【Single Instruction/Multiple Data】

Goのコネクションプーリングまわりのメトリクス収集

プラットフォームチームの菅原です。

Goでデータベースを使う場合には、以下のメソッドでコネクションプーリングまわりの設定を調整することが多いと思います。

MaxOpenConnsを設定してアプリケーションからの接続がデータベースのリミットを超えないようにしたり、MaxIdleConnsを設定してアイドルコネクションを保持し接続が都度発生しないようにしたりしますが、サービスにデプロイされた後はその設定が正しいか調べるためメトリクスが必要になります。

func (db *DB) Stats()はそれらコネクションプーリングの統計データを返すメソッドで、以下のような構造体を返します。

type DBStats struct {
    MaxOpenConnections int // Maximum number of open connections to the database.

    // Pool Status
    OpenConnections int // The number of established connections both in use and idle.
    InUse           int // The number of connections currently in use.
    Idle            int // The number of idle connections.

    // Counters
    WaitCount         int64         // The total number of connections waited for.
    WaitDuration      time.Duration // The total time blocked waiting for a new connection.
    MaxIdleClosed     int64         // The total number of connections closed due to SetMaxIdleConns.
    MaxIdleTimeClosed int64         // The total number of connections closed due to SetConnMaxIdleTime.
    MaxLifetimeClosed int64         // The total number of connections closed due to SetConnMaxLifetime.
}

カンムのサービスではモニタリング・ログ収集にDatadog、Goアプリからのログ出力にzerologを使っているので、アプリ起動時に以下のようなgoroutineを実行してコネクションプーリングの統計データをログ出力するようにしてみました。

// goroutine
func LogDBStats(db *sql.DB, host string, interval time.Duration, logger *zerolog.Logger) func() {
    ticker := time.NewTicker(interval)
    done := make(chan struct{})

    go func() {
        for {
            select {
            case <-done:
                return
            case <-ticker.C:
                logger.Info().
                    Str("log_type", "db_stats").
                    Str("host", host).
                    Interface("db_stats", db.Stats()).
                    Msg("Database statistics")
            }
        }
    }()

    return func() {
        ticker.Stop()
        close(done)
    }
}
// アプリ側
if cfg.DBStatsIntervalSec > 0 {
    interval := time.Duration(cfg.DBStatsIntervalSec) * time.Second
    cleanup := utils.LogDBStats(db, cfg.DBHost, interval, &logger)
    defer cleanup()
}

こうするとDatadogのログ管理でコネクションプーリングの統計データを見られるようになります。

またログの属性値からグラフを作成することも可能です。

これでコネクションプーリングまわりをモニタリングできるようになって便利になりました…

…と、思っていたのですが、dd-trace-goを調べてみたらv1.63.0でDB Statsを収集するための変更が入っていました。

github.com

なのでcontrib/database/sqlsqltrace.Open()/OpenDB()を使っている場合、WithDBStats()を使えば普通にメトリクスを収集することができます。

db := sqltrace.OpenDB(connector, sqltrace.WithServiceName("my-service"), sqltrace.WithDBStats())

Datadogのログ管理でメトリクスを収集する場合、メトリクスの保持期間がログの保持期間と同じなってしまうので、長期的にメトリクスを保持したい場合はWithDBStats()を使った方が良さそうです。

データベースの固定パスワードをなくす

プラットフォームチームの菅原です。

カンムのサービスで使われている各種アプリケーション(Goアプリ・管理アプリ・Redash等)では、データベースに接続する場合に一般的なパスワード認証を使っていることが多いです。

しかし、パスワード認証はパスワード漏洩のリスクやパスワード管理の手間があり、また要件によっては定期的なパスワードの変更も必要になってきます。 単純な方法で安全にパスワードをローテーションしようとすると、新しいDBユーザーを作成し、アプリケーションの接続ユーザーを変更し、さらに必要であれば元のDBユーザーのパスワードを変更して、接続ユーザーを元に戻す…などのオペレーションが必要になります。

そこで、AWS RDS(PostgreSQL)の「Secrets Managerによるマスターユーザーパスワードのパスワード管理」と「IAMデータベース認証」を利用してシステムから固定パスワードをなくすようにしてみました。

Secrets Managerによるマスターユーザーパスワードの管理

docs.aws.amazon.com

「Secrets Managerによるマスターユーザーパスワード管理」はRDSのマスターユーザーパスワードをSecrets Managerに管理させて定期的にパスワードをローテーションさせる機能です。 パスワードを管理するSecretは自動的に作成されるので、RDS側の設定を変更するだけで機能は有効になります。

terraformでの設定は以下のようになります。

resource "aws_rds_cluster" "my_db" {
  cluster_identifier = "my-db"
  # ...
  manage_master_user_password = true

  # aws secretsmanager get-secret-value \
  #   --secret-id $(
  #    aws rds describe-db-clusters \
  #      --db-cluster-identifier my-db \
  #      --query 'DBClusters[0].MasterUserSecret.SecretArn' \
  #      --output text
  #   ) \
  #   --query SecretString --output text
}

Secret自体は自動作成されるのですがローテーション間隔やポリシーは管理したいので、作成されたSecretをterraformにインポートします。

resource "aws_secretsmanager_secret_rotation" "my_db" {
  secret_id = aws_rds_cluster.my_db.master_user_secret.0.secret_arn

  rotation_rules {
    automatically_after_days = 7
  }
}

resource "aws_secretsmanager_secret_policy" "my_db" {
  secret_arn = aws_rds_cluster.my_db.master_user_secret.0.secret_arn

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "secretsmanager:GetSecretValue"
        Condition = {
          ArnNotEquals = {
            "aws:PrincipalArn" = "(アクセスを許可するIAMロール)"
          }
        }
        Effect    = "Deny"
        Principal = "*"
        Resource  = "*"
      }
    ]
  })
}

オンラインの設定変更でも特にダウンタイムが発生するようなこともなく、簡単にマスターユーザーパスワードのローテーションを自動化することができました。

IAMデータベース認証

docs.aws.amazon.com

「IAMデータベース認証」はパスワードの代わりに一時的な認証トークンを生成して許可されたIAMロールからデータベースに接続できるようにする機能です。認証トークンの有効期限は15分で、データベースに接続できれば基本的にコネクションは維持されます。

マスターユーザーパスワードと違ってアプリケーションで使われているので、アプリケーション側の修正も必要になります。

RDS・IAMの設定

「IAMデータベース認証」を有効にするには、まずRDSの設定を変更します。

resource "aws_rds_cluster" "my_db" {
  cluster_identifier = "my-db"
  # ...
  iam_database_authentication_enabled = true
}

PostgreSQLの場合、接続するDBユーザー(ロール)にrds_iamロールを付与します。

GRANT rds_iam TO app_db_user;

rds_iamを付与するとIAM認証以外では接続できなくなるので注意が必要です。 IAM認証を有効化する際には、IAM認証用のユーザーを新しく作成して、アプリケーションの接続ユーザーをそちらに変更するようにしました。

DBに接続するEC2インスタンスやECSタスクのIAMロールにはrds-db:connectの権限を付与します。

resource "aws_iam_role_policy" "app_db_connect" {
  role = aws_iam_role.app.name
  name = "db-connect"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = "rds-db:connect"
        Resource = "arn:aws:rds-db:ap-northeast-1:123456789012:dbuser:${aws_rds_cluster.my_db.cluster_resource_id}/app_db_user"
      }
    ]
  })
}

アプリケーション側の変更

Goアプリ

database/sqlsql.Open()はコネクションプールを返すので、sql.Open()を呼び出すタイミングと実際にデータベースに接続するタイミングは異なります。IAM認証を使う場合、実際にデータベースに接続するタイミングで認証トークンを生成するように設定する必要があります。

PostgreSQL用のドライバjackc/pgxではstdlib.GetConnector()にオプションを渡すことでデータベース接続のコールバックを設定することができます。

実際のコードは以下のような感じになりました。

func ConnectDB(dsn *url.URL, iamAuth bool) (*sql.DB, error) {
    opts := []stdlib.OptionOpenDB{}

    if iamAuth {
        host := dsn.Hostname()
        port := dsn.Port()
        username := dsn.User.Username()

        if !strings.HasSuffix(host, ".rds.amazonaws.com") {
            var err error
            host, err = net.LookupCNAME(host)

            if err != nil {
                return nil, err
            }

            host = strings.TrimSuffix(host, ".")
        }

        // import "github.com/jackc/pgx/v5/stdlib"
        opts = append(opts, stdlib.OptionBeforeConnect(func(ctx context.Context, cc *pgx.ConnConfig) error {
            awscfg, err := config.LoadDefaultConfig(ctx)

            if err != nil {
                return err
            }

            // import "github.com/aws/aws-sdk-go-v2/feature/rds/auth"
            token, err := auth.BuildAuthToken(ctx, host+":"+port, awscfg.Region, username, awscfg.Credentials)

            if err != nil {
                return err
            }

            cc.Password = token
            return nil
        }))
    }

    cfg, err := pgx.ParseConfig(dsn.String())

    if err != nil {
        return nil, err
    }

    // import "github.com/jackc/pgx/v5/stdlib"
    connector := stdlib.GetConnector(*cfg, opts...)
    db := sql.OpenDB(connector)
    return db, nil
}

データベースのエンドポイントにはCNAMEのエイリアスをつけているので、net.LookupCNAME()で実際のエンドポイントを取得しています。

Djangoアプリ

管理用のDjangoアプリはlabd/django-iam-dbauthで対応しました。

DATABASES = {
    "default": {
        "HOST": "<hostname>",
        "USER": "<user>",
        "NAME": "<db name>",
        "ENGINE": "django_iam_dbauth.aws.postgresql",
        "OPTIONS": {
            "use_iam_auth": True,
            "sslmode": "require",
            "resolve_cname_enabled": True,
        }
    }
}

CNAMEの解決機能が実装されているのが便利です。

DBマイグレーション

GoアプリのDBマイグレーションにはAlembicを使っています。 データベースのパスワードは環境変数経由でAlembicに渡されます。

環境変数で認証トークンを渡す場合、AWS CLIで生成することが多いのですが「AWS CLI一式をDockerイメージに入れたくない」「ホストやユーザー名の渡し方を簡潔にしたい」「CNAMEの解決をしたい」といった理由から、認証トークンを生成するCLI rdsauthを作成しました。

github.com

rdsauthはDB URLを渡すと認証トークンを生成します。また-eオプションをつけることでexport PGPASSWORD=...の形式で出力することもできます。

rdsauthを使って、以下のようにDBマイグレーションを実行するようにしました。

export DB_USER=...
export DB_HOST=...
export DB_PASSWORD=$(rdsauth postgres://${DB_USER}@${DB_HOST})

if [ "$MIGRATE" = "true" ]; then
  python manage.py migrate
else
  python manage.py showmigrations
fi

AlembicをIAM認証に対応させる上で少しつまずいたのがconfigparserのエスケープです。 env.pyで以下のようにしてDB接続情報を渡すことがあると思うのですが、configは内部的にconfigparserを使っているため%エスケープが必要になります。

# dbpass = os.environ.get("DB_PASSWORD") # NG
dbpass = os.environ.get("DB_PASSWORD").replace("%", "%%")
config.set_section_option("alembic", "DB_PASSWORD", dbpass)

connectable = engine_from_config(
    config.get_section(config.config_ini_section),
    # ...
)

psql

基本的にpsqlで直接データベースに接続することはないのですが、どうしてもpsqlでの作業が必要になることがあります。 そのための作業用ユーザーもIAM認証で接続するようにしました。

psqlの接続情報は.pg_service.confで管理されており、$(rdsauth -e postgres://..) ; psql service=my-dbと実行すればデータベースに接続することができるのですが、rdsauthの接続情報は.pg_service.confから自動的に取得してほしかったので簡単なラッパースクリプト pxを作成しました。

#!/bin/bash
PC_SERVICE_CONF=~/.pg_service.conf

if [ $# -eq 0 ]; then
  echo "usage: px <service> [extra-args ...]"
  echo -e "\nservice:"
  grep '^\[' $PC_SERVICE_CONF | tr -d '[]' | sed 's/^/  - /'
  exit 0
fi

set -eo pipefail

NAME=$1
shift

JSON=$(ini2json $PC_SERVICE_CONF | jq --arg name "$NAME" '.[$name]')

if [ "$JSON" = "null" ]; then
  echo "error: service not found - $NAME"
  exit 1
fi

DB_HOST=$(echo "$JSON" | jq -r '.host')
DB_PORT=$(echo "$JSON" | jq -r '.port')
DB_USER=$(echo "$JSON" | jq -r '.user')

$(rdsauth -e "postgres://${DB_USER}@${DB_HOST}:${DB_PORT}")

psql service="$NAME" "$@"

px my-dbを実行することでIAM認証を意識せずにpsqlでデータベースに接続できます。

Terraform(PostgreSQL)

PostgreSQLのロールはterraform(cyrilgdn/terraform-provider-postgresql)で管理しているので、これもIAM認証で接続するようにしました。

プロバイダーにはrdsauthを使って環境変数経由で認証トークンを渡すこともできるのですが、aws_rds_iam_authというオプションを有効にすればプロバイダー内部で認証トークンを生成してくれます。

terraform {
  required_providers {
    postgresql = {
      source  = "cyrilgdn/postgresql"
      version = ">= 1.25"
    }
  }
}

variable "aws_rds_iam_auth" {
  type    = bool
  default = false
}

provider "postgresql" {
  # ...
  aws_rds_iam_auth = var.aws_rds_iam_auth
}

Redash

Redashは、Redash本体のデータベースへの接続とPostgreSQLデータソースの接続でIAM認証をすることになりますが、残念ながら今のところどちらもIAM認証には対応していません。

しかし、修正自体はそれほど難しくはないのでPull Requestを作成しました。マージされることを祈るばかりです。

github.com github.com

まとめ

AWS RDSの機能を使って固定パスワードをなくす話を書きました。

今までは積極的にこれらの機能を使ってこなかったのですが、やってみると特に問題なく導入することができました。 パスワードのローテーション作業は気をつかう手間のかかる作業なので、今後はなるべくデフォルトで有効にして運用コストを下げていきたいと考えています。

SAML Group Mappingを使ったEntra ID+Datadogのロール自動割り当て

プラットフォームチームの菅原です。

カンムではサービスのモニタリングにDatadogを利用しており、その管理はプラットフォームチームが担っています。

DatadogへのログインはMicrosoft Entra ID(旧AzureAD)を使ったシングルサインオンで行うため、DatadogのアカウントはEntra IDで一元管理されているのですが、Datadogのロールの割り当ては自動化されておらず、引き続き依頼を受けてプラットフォームチームが割り当てを行う状態でした。

カンムで使っているDatadogのロールはEntra IDのグループから一意に割り当てられるため、機械的マッピングすることができます。 調べたところ、サインオン時に任意のSAML属性からロールを割り当てるSAML Group Mappingという機能が提供されていたので設定してみました。

docs.datadoghq.com

Entra IDの設定

SAML Group Mappingを利用するにはユーザーが所属するグループに応じてSAML属性(クレーム)の値を動的に変更する必要があります。

最初、user.assignedrolesというソース属性を使ってエンタープライズアプリケーションのグループに割り当てられたロールでクレームの値を変更することを想定していたのですが、user.assignedrolesからはユーザーに割り当てられたロールを取得することができても、グループに割り当てられたロールを取得することができなかったため、「グループごとにクレームの値を変える」という要件を満たすことができず、また、設定も煩雑になってしまうためこちらを使うことができませんでした。

代わりに「条件に基づいてクレームを出力する」機能で要求条件(Claim conditions)で、グループごとの値を設定することでクレームの値を動的に変更することができました。以下はその設定画面です。

要求条件では、最終的に条件にマッチした値がクレームの値になります。

上記の例の場合、プラットフォームグループにマッチした場合、DatadogRoleクレームの値は adminに、開発者グループにマッチした場合はstandardに、どちらにもマッチしなかった場合はreadonlyになります。

(残念ながらterraformのazureadプロバイダでterraformingすることはできなかったため、手動で設定を行いました)

Datadogの設定

Datadog側の設定はterraformで行いました。 DatadogRoleクレームの値に応じて、ロールが設定されるようになっています。

data "datadog_role" "admin" {
  filter = "Datadog Admin Role"
}

data "datadog_role" "standard" {
  filter = "Datadog Standard Role"
}

data "datadog_role" "readonly" {
  filter = "Datadog Read Only Role"
}

# NOTE: Enable Mappings manually
# cf. https://my-org.datadoghq.com/organization-settings/mappings/role-mappings
resource "datadog_authn_mapping" "admin" {
  key   = "DatadogRole"
  value = "admin"
  role  = data.datadog_role.admin.id
}

resource "datadog_authn_mapping" "standard" {
  key   = "DatadogRole"
  value = "standard"
  role  = data.datadog_role.standard.id
}

resource "datadog_authn_mapping" "readonly" {
  key   = "DatadogRole"
  value = "readonly"
  role  = data.datadog_role.readonly.id
}

# NOTE: Entra ID SAML Mapping config
# https://portal.azure.com/#view/Microsoft_AAD_IAM/SamlClaimsEditClaimBladeV2/federatedSsoConfigurationIdentifier/...

まとめ

SAML Group Mappingを使うことでDatadogのアカウントの管理を完全にEntra IDで一元化することができました。

この手の基盤チームありがちな「○○の権限をください」というタスクは一つ一つはたいしたことはないのですが、積み重なると地味に作業の時間を奪われるので、なるべく自動化を進めていきたいところです。

Poolにおける残高管理の設計

こんにちは、エンジニアのpongzuです。 今日は、カンムが提供するプロダクト「Pool」の残高管理設計について書きます。

本記事では、まずPoolがどのような仕組みで動いているのか、投資やウォレットといったサービス仕様を簡単にご説明します。その後、それらをどのようにDB管理するかについて、設計の考え方を順を追って解説していきます。

仕様

Poolは簡単にいうと投資とVisaカードを組み合わせたプロダクトです。 アプリ上で口座開設を行い、ウォレットと呼ばれる機能に入金後、資産運用やカード決済に利用できます。 また、同時にウォレット残高と投資資産がカード利用可能額となり、Visaの加盟店でカード決済をすることができます。以下に仕組みを簡単に図解したものを貼ります。

※この仕様はシステム設計の説明のために簡易化したものです。正式な仕様はサービス紹介・利用規約をご参照ください。

ウォレット・投資・カードの機能について、もう少し詳しく説明します。

ウォレット

ウォレットは、入金の受け皿となる機能です。入金処理が行われると、その金額がウォレット残高として反映されます。また、入金以外にも、キャンペーン対象者へのポイント付与やカード支払いによるポイント還元などもウォレット残高に加算されます。

投資

ユーザーは、ウォレット残高から募集中のファンドへ投資申込を行うことができます。運用開始時にウォレット残高から投資資産へ資金が移行します。運用終了後には、資産(以下「償還額」と呼びます)を次の方法で利用できます。

  • ユーザーが指定する銀行口座への出金
  • 次の募集中ファンドへの再投資

カード

ウォレット残高と投資資産がカード利用可能額となり、Visa加盟店でカード決済を利用できます。 カード利用額は、利用月の月末に確定し翌月末にウォレット残高から支払われます。いわゆる「月末締め・翌月払い」のクレジットカード方式です。ただし、支払い額がウォレット残高を上回る場合に限り、カンムがファンドの持ち分を買い取る形で支払いに充てられます。これにより、ウォレット残高だけでなく、投資資産を活用したカード支払いが可能となっています。

以上のようにしてPoolは資産運用と預け入れた資金の流動性を両立させています。

DB設計

ここからは本題であるデータベース設計について説明します。 Poolの残高管理には、入金履歴・投資運用・カード支払いの3つが関係します。これらのテーブルがどのように関わり合い、残高管理を実現しているのかを順を追って解説します。 まず、主要なテーブルを抜き出してみると、deposit, investment_application, investment, card_payment, wallet_snapshotの5つが挙げられます。

※説明の便宜上すべてのテーブルにuser_idを持たせていますが、実際のリレーションとは異なります。

                                        Table "deposit"
     Column              |           Type           | Collation | Nullable |             Default
-------------------------+--------------------------+-----------+----------+----------------------------------
 id                      | bigint                   |           | not null | nextval('deposit_id_seq'::regclass)
 user_id                 | bigint                   |           | not null | 
 wallet_snapshot_id      | bigint                   |           | not null | 
 amount                  | numeric                  |           | not null | 

depositは入金履歴を管理するためのテーブルです。振込入金口座から入金通知を受けて入金額がウォレット残高に反映されます。 wallet_snapshot_idについては後述します。

                                        Table "investment_application"
     Column     |           Type           | Collation | Nullable |             Default
----------------+--------------------------+-----------+----------+----------------------------------
 id             | bigint                   |           | not null | nextval('investment_application_id_seq'::regclass)
 user_id        | bigint                   |           | not null |
 amount         | numeric                  |           | not null |

investment_application は投資申込を管理するためのテーブルです。募集中ファンドへの投資申込を行った際に作成され、運用開始日を迎えるまではウォレット残高に影響を与えません。また、その期間はキャンセルや金額の変更を行うことも可能です。

                                        Table "investment"
     Column     |           Type           | Collation | Nullable |             Default
----------------+--------------------------+-----------+----------+----------------------------------
 application_id | bigint                   |           | not null |
 user_id        | bigint                   |           | not null |
 amount         | numeric                  |           | not null |

investmentは運用開始後に作成される運用資産を表すテーブルです。investment_applicationのIDをPKとします。このレコードの作成時にウォレット残高から運用資産に残高が移行します。

                                        Table "card_payment"
     Column     |           Type           | Collation | Nullable |             Default
----------------+--------------------------+-----------+----------+----------------------------------
 id             | bigint                   |           | not null | nextval('card_payment_id_seq'::regclass)
 user_id        | bigint                   |           | not null |
 amount         | numeric                  |           | not null | 

card_paymentはカード支払いを管理するためのテーブルです。カード利用額の支払い処理で作成されます。詳しい内容は割愛しますが、実際にはcard_paymentが作成される前段にVisaから届いた決済確定電文を表すテーブルとそれを集計するテーブルが存在し、それを元にcard_paymentが作成されます。支払い額は、まずウォレット残高から差し引き、不足分は投資資産から相殺するという順番で充当されます。

                                         Table "wallet_snapshot"
     Column                 |           Type           | Collation | Nullable |             Default
---------------------------+--------------------------+-----------+----------+----------------------------------
 id                        | bigint                   |           | not null | nextval('card_payment_id_seq'::regclass)
 user_id                   | bigint                   |           | not null |
 previous_snapshot_id      | bigint                   |           |          | 
 amount                    | numeric                  |           | not null |

wallet_snapshot は、ウォレット残高を管理するためのテーブルです。ウォレット残高に変動があるイベントが発生すると、その時点の最新残高を反映した wallet_snapshot が作成されます。 previous_snapshot_idには前回のwallet_snapshotのIDを持たせて残高の推移を表現するようにしました。

以上が主要なテーブルの概要です。次に、ウォレット残高を投資やカードとどのように関連付けるかについて説明します。 ウォレット残高が必ず動くイベントが発生する際にはイベント管理用テーブルにwallet_snapshot_idをFKとして持たせて、そうではない場合は中間テーブルを介してウォレット残高とイベントを紐つけるようにしています。 例えば、depositは入金時に必ずウォレット残高に反映されるため、wallet_snapshot_idをFKとして持ちます。投資とカード支払いについてはウォレット残高から充当された場合のみ、中間テーブルを介してイベントとウォレット残高の変更が紐つけられます。

このようにすることで、資金の流れや原資の管理が容易となりました。 例えば、ウォレット残高や再投資から充当された運用資産の内訳は、investment テーブルとそれに紐づく中間テーブルを辿るクエリで算出可能です。

ここで簡単に実装例も書いておきます。 ウォレット残高から投資を開始するコードはこんな感じです。

※カンムではGoを使っています。

// トランザクションを開始
tx, err := db.Begin()
// 必ずロールバックするように設定
defer tx.Rollback()

// ユーザーIDでロックを取得
if err := LockUser(tx, userID); err != nil {return}

// 申込を取得してInvestmentテーブルに登録
application, err := GetInvestmentApplication(tx, userID)
if err != nil {return}

investment := Investment{
    ApplicationID: application.ID,
    Amount:        application.Amount,
}
if err := investment.Create(tx); err != nil {return}

// 最新のウォレットスナップショットを取得
previousSnapshot, err := GetLatestWalletSnapshotByUserID(tx, userID)
if err != nil {return}

// 新しいウォレット残高を計算し、ウォレットスナップショットを登録
newWalletAmount := previousSnapshot.Sub(investment.Amount)
walletSnapshot := WalletSnapshot{
    UserID:             userID,
    WalletAmount:       newWalletAmount,
    PreviousSnapshotID: previousSnapshot.ID,
}
if err := walletSnapshot.Create(tx); err != nil {return}

// 中間テーブルの作成
r := WalletSnapshotInvestment{
   WalletSnapshotID: walletSnapshot.ID,
   InvestmentID: investment.ID,
}
if err := r.Create(tx); err != nil {return}

// コミット
if err := tx.Commit(); err != nil {return}

スナップショットを用いた他機能への接続は、ロックのとり忘れやスナップショットとイベント管理用テーブルの作成忘れなどに注意する必要がありますが、以下のメリットもあるように思います。

  • ウォレット残高の参照を最新のwallet_snapshotを取得すれば良いので計算量が抑えられる
  • 過去のある時点のウォレット残高の算出が容易となった
  • 残高を都度更新する必要がない
    • たとえば、「ウォレット」というテーブルに残高(balance)を保持し、入出金や支払いのたびにアップデートをかけるといった実装が不要

設計思想

最後に、ウォレット残高をこのような設計で管理することに至った背景についてお話しします。 ウォレットは、ユーザーに必ず1つ割り当てられる、いわばリソース型のエンティティです。 しかし、ウォレット残高自体は入金、投資運用、カード支払いといったイベントの履歴から計算可能であり、以下のようなクエリで表現できます。

SELECT COALESCE(SUM(amount), 0)
    FROM (
      SELECT amount
      FROM deposit
      WHERE user_id = $1
    UNION ALL
      SELECT -amount
      FROM investment
      WHERE user_id = $1
    UNION ALL
      SELECT -amount
      FROM card_payment
      WHERE user_id = $1
  ) AS _

このようにイベントの履歴のみを記録しておく設計では、専用の残高管理用テーブルを用意する必要はありません。このアプローチは実際に弊社の別プロダクト「バンドルカード」でも採用しており、詳細はこちらでご紹介しています。

一方で、 Pool はスナップショット形式ではあるものの残高を実体として管理する設計を採用しています。背景としてはウォレット残高がPoolというサービスを支える基盤的な機能であり、他の機能と疎結合な状態である必要があると考えたからです。 投資やカードといった機能は、法律上ウォレットを介することで初めて成立します。この構造は、Poolというサービスが存在する限り変わることのない重要な要件です。現在は投資とカードが資金の移動先として存在していますが、将来的に「保険」など新しい機能が追加される可能性もあれば、「カード」機能を削除する必要が生じることも考えられます。こうした変化に柔軟に対応するためには、ウォレット残高を他の機能から独立させた疎結合な設計が求められました。

さらに、ウォレット残高はサービスの中心として、単に機能を支えるだけでなく、多様な資金の流れを生み出す役割も果たします。この資金の流れを適切に管理することは、会計上の重要な課題でもあります。そのため、ウォレット残高を独立したエンティティとして定義し、各イベントをそこに紐づけることで、資金の流れを管理できる設計を目指しました。

要するに、ウォレットを他の機能から切り離すことで、法的・会計的な要件に柔軟に対応しつつ、拡張性の高い設計を実現することが今回のポイントでした。 こうした考えを踏まえ、スナップショットを活用して他の機能と接続する設計を選択するという結論に至りました。

まとめ

以上、この記事では、Poolが提供する「ウォレット」「投資」「カード」という機能の概要と、それらをDBでどのように管理しているかについて説明しました。 特に、スナップショットを用いた管理は、残高の推移や他機能への資産の流れを把握しやすくする上で大きな利点がありました。もちろん、残高の設計は一筋縄ではいかず、メリットとデメリットの両面がありますが、現時点ではこの設計を採用して正解だったと感じています。

最後に

カンムではソフトウェアエンジニアを募集しています。ご興味ある方はぜひご連絡ください!

herp.careers

エンジニアによるFintech法律勉強会を開きました

はじめに

ソフトウェアエンジニアの hata です。先日、カンムが提供するプロダクトの一つ、 Pool を取り巻くFintech法律勉強会を開きました。

pool-card.jp

金融は最も規制が厳格で複雑な分野です。金融サービスは、複数の法律から最適な組み合わせを選ぶことが競争優位になりえます(ルールに内在する「余白」を正しく理解し、自らのビジネスに最大限有効・有利に活用するという発想)。

Poolは、他にあまり見ない、投資・決済が一つになったサービスを提供しており、その法的構成も非常にユニークです。

ただし、その法的構成のユニークさから、Poolがどのような法律を組み合わせたプロダクトであるのか、非リーガル関連職のメンバーにとってはとっつきにくい内容となっています。社内に勉強できる資料はいくつかあるのですが、初学者向けにまとまった内容になっていませんでした。

そこで、Poolをとりまく法規制を整理し、非リーガル関連職でもスムーズに理解できるように勉強会を開くことにしました。

なぜソフトウェアエンジニアが法規制について学ぶ必要があるのか

  • 法規制の知識がないと、画期的なプロダクトの改善施策を考えたとしても、その案が今の法的構成では実現不可能な場合があります。法規制について良く知ることで、やってはいけないことを回避できることはもちろん、逆に規制を活用したアイデアが議論の中で生まれるかもしれません。
  • ソフトウェアエンジニアリングは、与えられた制限の中での最適解を見つける活動でもあります。法規制やレギュレーションはその制限の一つであり、それを理解しソフトウェアに落とし込んでいくことでエンジニアとしてのスキルも上げられると考えています。

つまるところ、エンジニアがドメイン知識を身につけることで賢くなると、エンジニア自身もチームも嬉しい、というわけですね。

社内にあるホワイトボードの図を借りると、Fintechに関わる法律をプロダクトに関わるメンバー全員が勉強することで「いこうぜ」と「わーい」の第一象限に当てはまるということです。

社内ホワイトボードに描かれたMECEについての図

勉強会の開催にあたり、気をつけたこと

法律分野の用語や法的整理は非常にナイーブです。JavaJavaScript が異なるように、曖昧なまま理解をすると非常に危険です。金融サービスを作っている以上、法律の誤用は最悪の場合ユーザーの金融資産への侵害につながります。

私はあくまでソフトウェアエンジニアなので、間違った解釈を広めないよう、勉強会に使う資料は事前にリーガルチームにチェックしてもらいました。また、勉強会にも同席してもらい、私が誤ったことを口走ろうものならツッコミを入れてもらうようにしました。

実際の様子

勉強会の様子。弊社はリモート組織なので、Google Meetにて開催。

内容的に全てを公開することはできませんが、以下のような構成で行いました。

  • Pool の各機能(ウォレット・決済・投資)を取り巻く法規制(例:資金決済法、割賦販売法、金融商品取引法)の整理
  • 各法律に関するライセンス(前払式支払手段、第二種金融商品取引業)の解説
  • サービス全体を取り巻く法規制(犯罪収益移転防止法)
  • Poolの機能に関する法的な観点をQ&A形式で掘り下げ

以下は、勉強会の資料の一部です。

おわりに

元々は、Pool のメンバーが増えてきたこともあってオンボーディング的にやろうとしたのがきっかけでした。勉強会参加者の方々からは、好意的な感想をもらえたのでよかったです。

単発の勉強会で限られた時間で全てを理解するのは難しいとは思うのですが、今までは勉強するためのとっかかりすらなかったので、後学のために資料を残す機会を作れたのは個人的に良かったなと思っています。

カンムではソフトウェアエンジニアを募集しています。ソフトウェアエンジニアリングにとどまらず、Fintech最前線に立ってあらゆる知識に触れる機会があるおもしろポジションです。少しだけでも話を聞いてみたいという方でも大歓迎なので、ぜひカジュアル面談でお話ししましょう。

herp.careers