Gmail 管理者検疫に関するアラートをSlackへ通知させたい!

カンムでコーポレートエンジニアをやっているhikkyです。

今回セキュリティチームからの依頼で、GoogleWorkspace(以下GWS)の機能である 「高度なフィッシングと不正なソフトウェアへの対策」 という機能の一部を有効化しました。 しかしこの機能通知がメールのみのため、Slackへ通知させるということを行いました。
この時の内容をせっかくなので、ブログ記事にしてみます。

必要システム

この手順を実施するためには以下のシステムが必要です。

  • GoogleWorkspace Enterpriseエディション以上
  • Slack有料プラン
  • Google Cloud
  •   Cloud Functions
  •   Cloud Scheduler
  •    BigQuery
  • Python3

事前準備

Google Cloud設定

こちらのドキュメントを参考に、プロジェクトを設定します。 support.google.com

GWSのログをBigQueryへエクスポート設定

GWSのプランがEnterprise以上の場合、GWSのログをBigQueryにエクスポートすることができます。
通常GWSのログは6ヶ月しか残すことができませんが、BigQueryにエクスポートすることで長期間ログの保存が可能になり、クエリでログを検索することができるようになります。

  • GWS管理コンソールへアクセスします。
  • 「レポート」→「BigQuery Export」を開きます。
  • Google BigQueryへのGoogle Workspaceデータのエクスポートを有効にします」にチェックを入れます。
  • 事前に設定した、GCPプロジェクトID、任意のデータセット名、ロケーション制限を設定し、【保存】をクリックします。

設定してから反映されるまでに、最大48時間かかることがあるため、しばらく待ちます。

設定したGCPプロジェクトのBigQueryを開き、データセットが作成されたかを確認します。
(ここでのデータセット名はgwslogです)
データセット下に、activityとusageが表示されていればOKです。
ただしデータが実際に流れてくるまでに時間がかかることがあるようです。

データが入ってきたかは、各テーブルでプレビューを表示させることで判断できます。

サンプルクエリの実行

BigQueryにGWSログデータが流れてきたことが確認できたら、サンプルクエリを実行して確認してみます。
こちらのページ にクエリ例が掲載されているので、試しに特権管理者の数を出力するクエリを実行してみます。

SELECT COUNT(DISTINCT user_email) as number_of_super_admins, date
FROM api_project_name.dataset_name.usage
WHERE accounts.is_super_admin = TRUE
GROUP BY 2
ORDER BY 2 DESC;

以下のような結果が返ってきて、日付単位での増減がわかりますね。

GWSメール検疫を設定する

  • GWS管理コンソールへアクセスします。
  • 「アプリ」→「Google Workspace」→「Gmail」→「検疫の管理」を開きます。
  • 【検疫の追加】をクリックします。 検疫されたメールを確認できるグループを指定したいので、Defaultを利用せず新しい検疫を追加します。 GWSでグループを作成し、検疫確認を実施するメンバーをグループに追加します。 【グループを管理】検疫確認を行うグループを設定します。

「メールが検疫されたときに定期的に通知する」にチェックをつけて保存します。

BigQueryで検疫されたメールを出力する

当初以下のようなクエリで一覧が取得できると思ったのですが、処理容量が大きく、課金額も大きくなってしまうため、処理容量を減らす必要があります。

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address as source,
     gmail.message_info.source.from_header_address as st_address,
     gmail.message_info.source.from_header_displayname as displayname,
     destination.address as destination,
     gmail.message_info.rfc2822_message_id
FROM gwslog.activity d, d.gmail.message_info.destination
WHERE
     EXISTS(SELECT 1 FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence
          WHERE consequence.action = 3)
     AND TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 12 HOUR)
ORDER BY
  timestamp DESC;

必要なテーブルの詳細を確認すると、日分割テーブルとなっていました。

そのためリアルタイムでの通知は諦めて、一日一回の通知だけをさせる方針としました。 修正したクエリは以下です。

SELECT
    FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S%z', TIMESTAMP_MICROS(gmail.event_info.timestamp_usec), 'Asia/Tokyo') as timestamp,
    gmail.message_info.subject,
    gmail.message_info.source.address as source,
    gmail.message_info.source.from_header_address as source_address,
    gmail.message_info.source.from_header_displayname as displayname,
    destination.address as destination,
    gmail.message_info.rfc2822_message_id
FROM 
    `{os.environ["PROJECT_ID"]}.{os.environ["DATASET_ID"]}.{os.environ["TABLE_ID"]}`,
    UNNEST(gmail.message_info.destination) as destination
WHERE 
    _PARTITIONTIME = TIMESTAMP(TIMESTAMP_SUB(CURRENT_DATE(), INTERVAL 1 DAY))
    AND EXISTS (
        SELECT 1
        FROM UNNEST(gmail.message_info.triggered_rule_info) ri, UNNEST(ri.consequence) c
        WHERE c.action = 3
    )

Slackへ通知する

Cloud Functionsの設定

以下のようなpythonスクリプトを作成して、Cloud Functionsにデプロイします。

import os
import json
import requests
from google.cloud import bigquery
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

def query_to_slack(request, context):

    # Create BigQuery client
    client = bigquery.Client()

    # Define the query
    query = f"""
        SELECT
            FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S%z', TIMESTAMP_MICROS(gmail.event_info.timestamp_usec), 'Asia/Tokyo') as timestamp,
            gmail.message_info.subject,
            gmail.message_info.source.address as source,
            gmail.message_info.source.from_header_address as source_address,
            gmail.message_info.source.from_header_displayname as displayname,
            destination.address as destination,
            gmail.message_info.rfc2822_message_id
        FROM 
            `{os.environ["PROJECT_ID"]}.{os.environ["DATASET_ID"]}.{os.environ["TABLE_ID"]}`,
            UNNEST(gmail.message_info.destination) as destination
        WHERE 
            _PARTITIONTIME = TIMESTAMP(TIMESTAMP_SUB(CURRENT_DATE(), INTERVAL 1 DAY))
            AND EXISTS (
                SELECT 1
                FROM UNNEST(gmail.message_info.triggered_rule_info) ri, UNNEST(ri.consequence) c
                WHERE c.action = 3
            )
    """

    # Execute the query
    query_job = client.query(query)
    results = query_job.result()

    if ( results.total_rows > 0 ):
        #クエリ結果が1件以上あった場合にSlack通知
        send_slack_notification(results)


def format_results(results):
    blocks = []
    blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "検疫されたメールがあります。\n"
            }
        })
    for result in results:
        blocks.append({
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": f"*Timestamp:*\n{result['timestamp']}"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*Subject:*\n{result['subject']}"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*Source:*\n{result['source']}"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*Source Address:*\n{result['source_address']}"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*Display Name:*\n{result['displayname']}"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*Destination:*\n{result['destination']}"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*Message ID:*\n{result['rfc2822_message_id']}"
                }
            ]
        })
        blocks.append({
            "type": "divider"
        })
    return blocks

def send_slack_notification(results):
    client = WebClient(token=os.environ['SLACK_API_TOKEN'])

    try:
        response = client.chat_postMessage(
            channel="#nf-quarantined",
            text="Quarantined Email Notification",
            blocks=format_results(results)
        )
    except SlackApiError as e:
        print(f"Error sending message: {e}")

Cloud Schedulerを利用して定期実行させる

以下のドキュメントを参考に、Cloud Schedulerを利用して一日一回定期実行するように設定します。 cloud.google.com

実行結果の確認

検疫メールがある場合に、指定した時間になると以下のようなSlack通知がきます。

最後に

検疫機能便利ではあるのですが、指定ドメインを検疫対象から外すといったことができません。
そのため弊社で利用しているツールからの通知メールも入ってしまうことがあります。
メールをあまり見ないので、Slackへ通知させることで検疫されたメールがあると気づけるので便利になったのではないでしょうか?
GWSログをBigQueryにエクスポートすることで、ログをクエリで検索することができるようになるので色々と幅は広がりそうです。