Skip to content

データ同期フロー(Sync Data Flow)

概要

GMAC MEOサービスでは、毎日定時にデータ同期と検索ランキングスクレイピングを自動実行する。 3つの顧客システム(GCOR、GMAC/Mappy、PIPIT)のマスターデータをスクレイパーDBに集約し、AWS Batchで並列スクレイピングを行い、結果を各システムに書き戻す。

対象システム

システムsystem_idDB クラスター用途
GMAC / Mappy1gmac-db-production-cluster-1メイン顧客DB
GCOR2gcor-db-production-cluster顧客DB
PIPIT3kingmeo-db-production-cluster顧客DB
Scraper-scraper.cluster-*中間処理DB

全体フロー

各ステップの詳細

Step 1: sync_database

項目
Lambda関数名sync_database
RuntimePython 3.13
Timeout900秒
RoleLambda_scraper
VPCあり(プライベートサブネット)
実行時間約2分

処理内容:

  1. GCOR、GMAC/Mappy、PIPIT の3つのDBに接続
  2. 既存データのバックアップテーブルを作成(backup_* テーブル)
  3. 以下のテーブルをスクレイパーDBに同期:
    • mappy_user_settings
    • mappy_gbp_locations
    • mappy_gbp_locations_store_codes
    • mappy_integrated_gbp_locations
    • mappy_keywords
  4. 完了後、EventBridge にカスタムイベントを発行:
    json
    {
      "detail-type": "SyncDatabaseCompleted",
      "source": "mappy.sync_db",
      "detail": {
        "status": "completed",
        "timestamp": "2026-04-07T17:02:13+09:00"
      }
    }

環境変数(NEXT_LAMBDA_FUNCTION_NAME): fetch_search_ranking — 次に呼び出すLambda関数名。

Step 2: fetch_search_ranking

項目
Lambda関数名fetch_search_ranking
RuntimePython 3.13
Timeout900秒
RoleTestCreateBatch-role-*
実行時間約47秒

処理内容:

  1. EventBridge イベント SyncDatabaseCompleted をトリガーとして起動
  2. 80個のAWS Batchジョブを投入:
    • Batch 1〜40: normal(通常検索)— 各350キーワード、Offset 0〜13,650
    • Batch 41〜80: reverse(逆検索)— 各350キーワード、Offset 0〜13,650
  3. 合計約28,000キーワードを並列処理

Step 3: AWS Batch(Fargate)

項目
ジョブ定義scraper_def
ジョブキューscrap_queue
コンピューティング環境scraper_env_v1(Fargate)
コンテナイメージ881980194724.dkr.ecr.ap-northeast-1.amazonaws.com/scraper:latest
vCPU1.0
メモリ2048 MB
並列数最大80コンテナ
実行時間約3〜4時間

処理内容:

  • 各コンテナがPlaywrightでGoogle検索を実行し、ランキングデータを取得
  • 結果はスクレイパーDBに書き込み
  • コンテナ完了時に total_batch_done カウンタをインクリメント

Step 4: reTry_Failed_Keywords

項目
Lambda関数名reTry_Failed_Keywords
RuntimePython 3.14
Timeout900秒
トリガー各Batchジョブ完了時にコールバック
実行時間約100ms(未完了時)/ 約2秒(全完了時)

処理内容:

  1. 各Batchコンテナ完了時にコールバックとして呼び出される
  2. スクレイパーDBの mst_systems テーブルを確認: total_batch_donetotal_batch(=80)を比較
  3. 未完了の場合 → 即終了
  4. 全バッチ完了(80/80)の場合:
    • 失敗キーワードのリトライのため fetch_search_ranking を再呼び出し

Step 5: syncResultFetchSearch

項目
Lambda関数名syncResultFetchSearch
RuntimePython 3.14
Timeout900秒
RoleLambda_scraper
メモリ5120 MB
実行時間約46秒

処理内容:

  1. Step 1: スクレイパーDBの検索結果を各システムDBに同期
    • mappy_search_ranking_exports
    • mappy_temp_search_rankings
  2. Step 2: プロセスデータの同期
    • mappy_search_ranking_processes — process_id の更新
    • 関連する exports と temp_rankings の process_id も更新
  3. Step 2.5: 更新ログの処理
    • mappy_search_ranking_update_logs — 当日分のレコードを更新(status=1)
    • 関連テーブルに update_log_id を設定
  4. 後処理:
    • mst_systemstotal_batch_done=0retry_count=0 にリセット
    • run_times を +1 インクリメント

タイムライン(日次実行例)

08:00 JST  sync_database 開始
08:02 JST  sync_database 完了 → SyncDatabaseCompleted イベント発行
08:02 JST  fetch_search_ranking 起動 → 80 Batch jobs 投入
08:03 JST  AWS Batch 実行開始(80コンテナ並列)
   ...     各Batch完了時 → reTry_Failed_Keywords コールバック
12:00 JST  全バッチ完了(80/80)
12:00 JST  fetch_search_ranking(リトライ)起動
   ...     失敗キーワードのリトライBatch実行
12:30 JST  syncResultFetchSearch 実行
12:31 JST  結果同期完了 → カウンタリセット

Places API 統合後のフロー

Places APIバッチを既存フローに統合する。Places APIバッチはスクレイピングバッチと並列実行される。結果同期(syncResultFetchSearch)の条件が変更: スクレイピング(total_batch_done=total_batch)AND Places API の両方が完了している必要がある。

mst_systems テーブル変更

カラム名説明
total_batchintスクレイピングバッチ総数(データ量により変動)
total_batch_doneintスクレイピング完了バッチ数
retry_countintリトライ回数
places_api_doneint(新規) Places APIバッチ完了フラグ(0=未完了、1=完了)

既存フローからの変更点

  1. Places APIバッチを同じフローで実行(fetch_search_ranking がスクレイピングバッチと一緒に投入)
  2. Places APIバッチ完了時 → Lambdaイベントplaces_api_done = 1 を更新
  3. reTry_Failed_Keywords の判定条件を変更:
    • 現在: total_batch_done = total_batch のみ
    • 統合後: total_batch_done = total_batch AND places_api_done = 1
  4. 両条件を満たした場合のみ → syncResultFetchSearch を呼び出し、結果(スクレイピング + Places API)を3サーバーに同期

フロー図

処理詳細

  1. fetch_search_ranking: スクレイピングバッチとPlaces APIバッチを同時投入
  2. スクレイピングバッチ: Playwrightコンテナが並列実行、完了ごとに total_batch_done++
  3. Places APIバッチ(places_api_def): Google Places API (Text Search) でランキング取得
    • コンテナイメージ: python310:latest
    • data_source=1 のロケーションのみ対象
    • 完了時 → Lambdaイベントで places_api_done = 1 を更新
  4. reTry_Failed_Keywords(ロジック変更):
    • 各バッチ完了時にコールバックとして呼び出し
    • mst_systems を確認: total_batch_done = total_batch AND places_api_done = 1
    • 未達 → 即終了
    • 両方完了 → syncResultFetchSearch を呼び出し
  5. syncResultFetchSearch: スクレイピング + Places API の結果をスクレイパーDBから3サーバー(GCOR、GMAC、PIPIT)に同期
  6. 後処理: total_batch_done=0retry_count=0places_api_done=0 にリセット

タイムライン(統合後)

08:00 JST  sync_database 開始
08:02 JST  sync_database 完了 → SyncDatabaseCompleted
08:02 JST  fetch_search_ranking 起動
           → Scraping Batch jobs 投入
           → Places API Batch job 投入
08:03 JST  Scraping + Places API 並列実行
   ...     各Batch完了時 → reTry_Failed_Keywords コールバック
           (判定: total_batch_done=total_batch AND places_api_done=1)
11:00 JST  Places API Batch 完了 → Lambda で places_api_done=1 更新
12:00 JST  Scraping Batch 全完了
12:00 JST  両条件達成 → syncResultFetchSearch 実行
12:01 JST  結果同期(Scraping + Places API)→ カウンタリセット

Lambda 関数一覧

Lambda関数名役割Timeoutメモリ
sync_databaseマスターデータ同期900s128 MB
fetch_search_rankingBatch ジョブ投入900s128 MB
reTry_Failed_Keywordsバッチ完了ポーリング + リトライ900s128 MB
syncResultFetchSearch結果データ同期900s5120 MB
checkBatchStatusバッチステータス確認3s128 MB
cloudwatch-to-slackエラー通知 → Slack--

関連リソース

  • ECR: 881980194724.dkr.ecr.ap-northeast-1.amazonaws.com/scraper:latest
  • Batch Queue: scrap_queue(Fargate)
  • Batch Compute Env: scraper_env_v1
  • CloudWatch Log Groups:
    • /aws/lambda/sync_database
    • /aws/lambda/fetch_search_ranking
    • /aws/lambda/reTry_Failed_Keywords
    • /aws/lambda/syncResultFetchSearch
    • /aws/batch/job