データ同期フロー(Sync Data Flow)
概要
GMAC MEOサービスでは、毎日定時にデータ同期と検索ランキングスクレイピングを自動実行する。 3つの顧客システム(GCOR、GMAC/Mappy、PIPIT)のマスターデータをスクレイパーDBに集約し、AWS Batchで並列スクレイピングを行い、結果を各システムに書き戻す。
対象システム
| システム | system_id | DB クラスター | 用途 |
|---|---|---|---|
| GMAC / Mappy | 1 | gmac-db-production-cluster-1 | メイン顧客DB |
| GCOR | 2 | gcor-db-production-cluster | 顧客DB |
| PIPIT | 3 | kingmeo-db-production-cluster | 顧客DB |
| Scraper | - | scraper.cluster-* | 中間処理DB |
全体フロー
各ステップの詳細
Step 1: sync_database
| 項目 | 値 |
|---|---|
| Lambda関数名 | sync_database |
| Runtime | Python 3.13 |
| Timeout | 900秒 |
| Role | Lambda_scraper |
| VPC | あり(プライベートサブネット) |
| 実行時間 | 約2分 |
処理内容:
- GCOR、GMAC/Mappy、PIPIT の3つのDBに接続
- 既存データのバックアップテーブルを作成(
backup_*テーブル) - 以下のテーブルをスクレイパーDBに同期:
mappy_user_settingsmappy_gbp_locationsmappy_gbp_locations_store_codesmappy_integrated_gbp_locationsmappy_keywords
- 完了後、EventBridge にカスタムイベントを発行:json
{ "detail-type": "SyncDatabaseCompleted", "source": "mappy.sync_db", "detail": { "status": "completed", "timestamp": "2026-04-07T17:02:13+09:00" } }
環境変数(NEXT_LAMBDA_FUNCTION_NAME): fetch_search_ranking — 次に呼び出すLambda関数名。
Step 2: fetch_search_ranking
| 項目 | 値 |
|---|---|
| Lambda関数名 | fetch_search_ranking |
| Runtime | Python 3.13 |
| Timeout | 900秒 |
| Role | TestCreateBatch-role-* |
| 実行時間 | 約47秒 |
処理内容:
- EventBridge イベント
SyncDatabaseCompletedをトリガーとして起動 - 80個のAWS Batchジョブを投入:
- Batch 1〜40:
normal(通常検索)— 各350キーワード、Offset 0〜13,650 - Batch 41〜80:
reverse(逆検索)— 各350キーワード、Offset 0〜13,650
- Batch 1〜40:
- 合計約28,000キーワードを並列処理
Step 3: AWS Batch(Fargate)
| 項目 | 値 |
|---|---|
| ジョブ定義 | scraper_def |
| ジョブキュー | scrap_queue |
| コンピューティング環境 | scraper_env_v1(Fargate) |
| コンテナイメージ | 881980194724.dkr.ecr.ap-northeast-1.amazonaws.com/scraper:latest |
| vCPU | 1.0 |
| メモリ | 2048 MB |
| 並列数 | 最大80コンテナ |
| 実行時間 | 約3〜4時間 |
処理内容:
- 各コンテナがPlaywrightでGoogle検索を実行し、ランキングデータを取得
- 結果はスクレイパーDBに書き込み
- コンテナ完了時に
total_batch_doneカウンタをインクリメント
Step 4: reTry_Failed_Keywords
| 項目 | 値 |
|---|---|
| Lambda関数名 | reTry_Failed_Keywords |
| Runtime | Python 3.14 |
| Timeout | 900秒 |
| トリガー | 各Batchジョブ完了時にコールバック |
| 実行時間 | 約100ms(未完了時)/ 約2秒(全完了時) |
処理内容:
- 各Batchコンテナ完了時にコールバックとして呼び出される
- スクレイパーDBの
mst_systemsテーブルを確認:total_batch_doneとtotal_batch(=80)を比較 - 未完了の場合 → 即終了
- 全バッチ完了(80/80)の場合:
- 失敗キーワードのリトライのため
fetch_search_rankingを再呼び出し
- 失敗キーワードのリトライのため
Step 5: syncResultFetchSearch
| 項目 | 値 |
|---|---|
| Lambda関数名 | syncResultFetchSearch |
| Runtime | Python 3.14 |
| Timeout | 900秒 |
| Role | Lambda_scraper |
| メモリ | 5120 MB |
| 実行時間 | 約46秒 |
処理内容:
- Step 1: スクレイパーDBの検索結果を各システムDBに同期
mappy_search_ranking_exportsmappy_temp_search_rankings
- Step 2: プロセスデータの同期
mappy_search_ranking_processes— process_id の更新- 関連する exports と temp_rankings の process_id も更新
- Step 2.5: 更新ログの処理
mappy_search_ranking_update_logs— 当日分のレコードを更新(status=1)- 関連テーブルに
update_log_idを設定
- 後処理:
mst_systemsのtotal_batch_done=0、retry_count=0にリセットrun_timesを +1 インクリメント
タイムライン(日次実行例)
08:00 JST sync_database 開始
08:02 JST sync_database 完了 → SyncDatabaseCompleted イベント発行
08:02 JST fetch_search_ranking 起動 → 80 Batch jobs 投入
08:03 JST AWS Batch 実行開始(80コンテナ並列)
... 各Batch完了時 → reTry_Failed_Keywords コールバック
12:00 JST 全バッチ完了(80/80)
12:00 JST fetch_search_ranking(リトライ)起動
... 失敗キーワードのリトライBatch実行
12:30 JST syncResultFetchSearch 実行
12:31 JST 結果同期完了 → カウンタリセットPlaces API 統合後のフロー
Places APIバッチを既存フローに統合する。Places APIバッチはスクレイピングバッチと並列実行される。結果同期(syncResultFetchSearch)の条件が変更: スクレイピング(total_batch_done=total_batch)AND Places API の両方が完了している必要がある。
mst_systems テーブル変更
| カラム名 | 型 | 説明 |
|---|---|---|
total_batch | int | スクレイピングバッチ総数(データ量により変動) |
total_batch_done | int | スクレイピング完了バッチ数 |
retry_count | int | リトライ回数 |
places_api_done | int | (新規) Places APIバッチ完了フラグ(0=未完了、1=完了) |
既存フローからの変更点
- Places APIバッチを同じフローで実行(
fetch_search_rankingがスクレイピングバッチと一緒に投入) - Places APIバッチ完了時 → Lambdaイベントで
places_api_done = 1を更新 reTry_Failed_Keywordsの判定条件を変更:- 現在:
total_batch_done = total_batchのみ - 統合後:
total_batch_done = total_batchANDplaces_api_done = 1
- 現在:
- 両条件を満たした場合のみ →
syncResultFetchSearchを呼び出し、結果(スクレイピング + Places API)を3サーバーに同期
フロー図
処理詳細
- fetch_search_ranking: スクレイピングバッチとPlaces APIバッチを同時投入
- スクレイピングバッチ: Playwrightコンテナが並列実行、完了ごとに
total_batch_done++ - Places APIバッチ(places_api_def): Google Places API (Text Search) でランキング取得
- コンテナイメージ:
python310:latest data_source=1のロケーションのみ対象- 完了時 → Lambdaイベントで
places_api_done = 1を更新
- コンテナイメージ:
- reTry_Failed_Keywords(ロジック変更):
- 各バッチ完了時にコールバックとして呼び出し
mst_systemsを確認:total_batch_done = total_batchANDplaces_api_done = 1- 未達 → 即終了
- 両方完了 →
syncResultFetchSearchを呼び出し
- syncResultFetchSearch: スクレイピング + Places API の結果をスクレイパーDBから3サーバー(GCOR、GMAC、PIPIT)に同期
- 後処理:
total_batch_done=0、retry_count=0、places_api_done=0にリセット
タイムライン(統合後)
08:00 JST sync_database 開始
08:02 JST sync_database 完了 → SyncDatabaseCompleted
08:02 JST fetch_search_ranking 起動
→ Scraping Batch jobs 投入
→ Places API Batch job 投入
08:03 JST Scraping + Places API 並列実行
... 各Batch完了時 → reTry_Failed_Keywords コールバック
(判定: total_batch_done=total_batch AND places_api_done=1)
11:00 JST Places API Batch 完了 → Lambda で places_api_done=1 更新
12:00 JST Scraping Batch 全完了
12:00 JST 両条件達成 → syncResultFetchSearch 実行
12:01 JST 結果同期(Scraping + Places API)→ カウンタリセットLambda 関数一覧
| Lambda関数名 | 役割 | Timeout | メモリ |
|---|---|---|---|
sync_database | マスターデータ同期 | 900s | 128 MB |
fetch_search_ranking | Batch ジョブ投入 | 900s | 128 MB |
reTry_Failed_Keywords | バッチ完了ポーリング + リトライ | 900s | 128 MB |
syncResultFetchSearch | 結果データ同期 | 900s | 5120 MB |
checkBatchStatus | バッチステータス確認 | 3s | 128 MB |
cloudwatch-to-slack | エラー通知 → Slack | - | - |
関連リソース
- ECR:
881980194724.dkr.ecr.ap-northeast-1.amazonaws.com/scraper:latest - Batch Queue:
scrap_queue(Fargate) - Batch Compute Env:
scraper_env_v1 - CloudWatch Log Groups:
/aws/lambda/sync_database/aws/lambda/fetch_search_ranking/aws/lambda/reTry_Failed_Keywords/aws/lambda/syncResultFetchSearch/aws/batch/job