カンムでコーポレートエンジニアをやっているhikkyです。
今回セキュリティチームからの依頼で、GoogleWorkspace(以下GWS)の機能である
「高度なフィッシングと不正なソフトウェアへの対策」
という機能の一部を有効化しました。
しかしこの機能通知がメールのみのため、Slackへ通知させるということを行いました。
この時の内容をせっかくなので、ブログ記事にしてみます。
必要システム
この手順を実施するためには以下のシステムが必要です。
- GoogleWorkspace Enterpriseエディション以上
- Slack有料プラン
- Google Cloud
- Cloud Functions
- Cloud Scheduler
- BigQuery
- Python3
事前準備
Google Cloud設定
こちらのドキュメントを参考に、プロジェクトを設定します。 support.google.com
GWSのログをBigQueryへエクスポート設定
GWSのプランがEnterprise以上の場合、GWSのログをBigQueryにエクスポートすることができます。
通常GWSのログは6ヶ月しか残すことができませんが、BigQueryにエクスポートすることで長期間ログの保存が可能になり、クエリでログを検索することができるようになります。
- GWS管理コンソールへアクセスします。
- 「レポート」→「BigQuery Export」を開きます。
- 「Google BigQueryへのGoogle Workspaceデータのエクスポートを有効にします」にチェックを入れます。
- 事前に設定した、GCPプロジェクトID、任意のデータセット名、ロケーション制限を設定し、【保存】をクリックします。
設定してから反映されるまでに、最大48時間かかることがあるため、しばらく待ちます。
設定したGCPプロジェクトのBigQueryを開き、データセットが作成されたかを確認します。
(ここでのデータセット名はgwslogです)
データセット下に、activityとusageが表示されていればOKです。
ただしデータが実際に流れてくるまでに時間がかかることがあるようです。
データが入ってきたかは、各テーブルでプレビューを表示させることで判断できます。
サンプルクエリの実行
BigQueryにGWSログデータが流れてきたことが確認できたら、サンプルクエリを実行して確認してみます。
こちらのページ にクエリ例が掲載されているので、試しに特権管理者の数を出力するクエリを実行してみます。
SELECT COUNT(DISTINCT user_email) as number_of_super_admins, date FROM api_project_name.dataset_name.usage WHERE accounts.is_super_admin = TRUE GROUP BY 2 ORDER BY 2 DESC;
以下のような結果が返ってきて、日付単位での増減がわかりますね。
GWSメール検疫を設定する
- GWS管理コンソールへアクセスします。
- 「アプリ」→「Google Workspace」→「Gmail」→「検疫の管理」を開きます。
- 【検疫の追加】をクリックします。 検疫されたメールを確認できるグループを指定したいので、Defaultを利用せず新しい検疫を追加します。 GWSでグループを作成し、検疫確認を実施するメンバーをグループに追加します。 【グループを管理】検疫確認を行うグループを設定します。
「メールが検疫されたときに定期的に通知する」にチェックをつけて保存します。
BigQueryで検疫されたメールを出力する
当初以下のようなクエリで一覧が取得できると思ったのですが、処理容量が大きく、課金額も大きくなってしまうため、処理容量を減らす必要があります。
SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp, gmail.message_info.subject, gmail.message_info.source.address as source, gmail.message_info.source.from_header_address as st_address, gmail.message_info.source.from_header_displayname as displayname, destination.address as destination, gmail.message_info.rfc2822_message_id FROM gwslog.activity d, d.gmail.message_info.destination WHERE EXISTS(SELECT 1 FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence WHERE consequence.action = 3) AND TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 12 HOUR) ORDER BY timestamp DESC;
必要なテーブルの詳細を確認すると、日分割テーブルとなっていました。
そのためリアルタイムでの通知は諦めて、一日一回の通知だけをさせる方針としました。 修正したクエリは以下です。
SELECT FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S%z', TIMESTAMP_MICROS(gmail.event_info.timestamp_usec), 'Asia/Tokyo') as timestamp, gmail.message_info.subject, gmail.message_info.source.address as source, gmail.message_info.source.from_header_address as source_address, gmail.message_info.source.from_header_displayname as displayname, destination.address as destination, gmail.message_info.rfc2822_message_id FROM `{os.environ["PROJECT_ID"]}.{os.environ["DATASET_ID"]}.{os.environ["TABLE_ID"]}`, UNNEST(gmail.message_info.destination) as destination WHERE _PARTITIONTIME = TIMESTAMP(TIMESTAMP_SUB(CURRENT_DATE(), INTERVAL 1 DAY)) AND EXISTS ( SELECT 1 FROM UNNEST(gmail.message_info.triggered_rule_info) ri, UNNEST(ri.consequence) c WHERE c.action = 3 )
Slackへ通知する
Cloud Functionsの設定
以下のようなpythonスクリプトを作成して、Cloud Functionsにデプロイします。
import os import json import requests from google.cloud import bigquery from slack_sdk import WebClient from slack_sdk.errors import SlackApiError def query_to_slack(request, context): # Create BigQuery client client = bigquery.Client() # Define the query query = f""" SELECT FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S%z', TIMESTAMP_MICROS(gmail.event_info.timestamp_usec), 'Asia/Tokyo') as timestamp, gmail.message_info.subject, gmail.message_info.source.address as source, gmail.message_info.source.from_header_address as source_address, gmail.message_info.source.from_header_displayname as displayname, destination.address as destination, gmail.message_info.rfc2822_message_id FROM `{os.environ["PROJECT_ID"]}.{os.environ["DATASET_ID"]}.{os.environ["TABLE_ID"]}`, UNNEST(gmail.message_info.destination) as destination WHERE _PARTITIONTIME = TIMESTAMP(TIMESTAMP_SUB(CURRENT_DATE(), INTERVAL 1 DAY)) AND EXISTS ( SELECT 1 FROM UNNEST(gmail.message_info.triggered_rule_info) ri, UNNEST(ri.consequence) c WHERE c.action = 3 ) """ # Execute the query query_job = client.query(query) results = query_job.result() if ( results.total_rows > 0 ): #クエリ結果が1件以上あった場合にSlack通知 send_slack_notification(results) def format_results(results): blocks = [] blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "検疫されたメールがあります。\n" } }) for result in results: blocks.append({ "type": "section", "fields": [ { "type": "mrkdwn", "text": f"*Timestamp:*\n{result['timestamp']}" }, { "type": "mrkdwn", "text": f"*Subject:*\n{result['subject']}" }, { "type": "mrkdwn", "text": f"*Source:*\n{result['source']}" }, { "type": "mrkdwn", "text": f"*Source Address:*\n{result['source_address']}" }, { "type": "mrkdwn", "text": f"*Display Name:*\n{result['displayname']}" }, { "type": "mrkdwn", "text": f"*Destination:*\n{result['destination']}" }, { "type": "mrkdwn", "text": f"*Message ID:*\n{result['rfc2822_message_id']}" } ] }) blocks.append({ "type": "divider" }) return blocks def send_slack_notification(results): client = WebClient(token=os.environ['SLACK_API_TOKEN']) try: response = client.chat_postMessage( channel="#nf-quarantined", text="Quarantined Email Notification", blocks=format_results(results) ) except SlackApiError as e: print(f"Error sending message: {e}")
Cloud Schedulerを利用して定期実行させる
以下のドキュメントを参考に、Cloud Schedulerを利用して一日一回定期実行するように設定します。 cloud.google.com
実行結果の確認
検疫メールがある場合に、指定した時間になると以下のようなSlack通知がきます。
最後に
検疫機能便利ではあるのですが、指定ドメインを検疫対象から外すといったことができません。
そのため弊社で利用しているツールからの通知メールも入ってしまうことがあります。
メールをあまり見ないので、Slackへ通知させることで検疫されたメールがあると気づけるので便利になったのではないでしょうか?
GWSログをBigQueryにエクスポートすることで、ログをクエリで検索することができるようになるので色々と幅は広がりそうです。