Cloud Run 関数で処理がループする時の対処法

Cloud Run 関数で処理がループする時の対処法

・ETLでもよく使われるCloud Run 関数
・例えば、Google Cloud Storageに保存したエクセルをBigQueryに取り込んでテーブル化できる
・ただし、大量のエクセルを処理する等で、処理済みのエクセルが再処理され異常ループに陥りやすい
・処理済みのエクセルを退避フォルダへアーカイブすることでループを回避できる

ETLでGoogle Cloud Storageに保存したエクセルを、BigQueryでテーブル化する時に役立つのがCloud Run 関数です。

Cloud Run 関数は最近になって正式リリースされたもので、Google Cloud Functionsと聞けば、馴染みのある方も多いと思います。

現在、Google Cloud Functionsという製品ブランドは廃止され、Cloud Run Functionsという名前になりました。

そのCloud Run Functionsには第1世代と第2世代があり、いわゆる旧Google Cloud Functionsは第1世代にあたります。

第2世代が、今日ご紹介するCloud Run 関数というわけです。

もし、これから、ETL等でご利用される場合、Cloud Run 関数(旧Google Cloud Functionsの第2世代)の利用を強く推奨いたします。

第1世代は制約も多く、使えるメモリー等のパフォーマンスも劣ります。

大容量データを、複雑な処理でバッチ化する場合、第1世代ではタイムアウト時間も最大540秒と短く、処理が終わらずに停止するケースがよくあります。

第2世代なら、ハイパフォーマンスで処理できますので、様々なビジネス課題に対応することが可能です。

今日は、その最新のCloud Run 関数のお話です。

Cloud Run 関数の処理が異常ループする

例えば、Google Cloud Storage「バケットA」にエクセルが100個あるとします。

100個のエクセルを一括でBigQueryにテーブル化したい時、Cloud Run 関数が便利です。

たいていの場合、以下のようにsource_bucketを指定し、1回の実行で済むように、エクセルを1つ処理して、また次のエクセルを処理・・のようにループさせると思います。

#ソースバケットの指定
source_bucket_name = 'bucket_a'

~~~

blobs = list(source_bucket.list_blobs())

for blob in blobs:
source_blob_name = blob.name

~~~

しかし、このまま実行すると、一度処理したエクセルも再実行する異常ループが発生します。

エクセルが少なければ処理できると思います。

しかし、エクセルが100個あるような場合、この異常ループにより、処理が進まず、Cloud Run 関数がタイムアウトしてしまいます。

これは、「バケットA」にあるエクセルをすべて処理するまで、Cloud Run 関数が動き続けるためです。

ファイルが少なければ、異常ループが発生しても、すべてのファイルが処理される場合が多いのですが、ファイルが多い場合、異常終了してしまいます。

やっかいなことに、このように想定外で終了した場合も、Cloud Run 関数のエラーログにはErrorが吐き出されないため、LoggingやMonitoringの検知もすり抜けてしまいます。

つまり、未処理であることにユーザが気づきづらいのです。

処理済みエクセルはバケットBへアーカイブ。バケットAからも消す。

このような場合、Google Cloud StorageにバケットBを作り、処理したエクセルをバケットBへコピーしてアーカイブします。

そして、処理したエクセルもバケットAから消します。

これにより、すべてのエクセルは1度しか処理されず、異常ループは解消します。

バケットBへアーカイブせず削除してもかまいませんが、リカバリ等を考慮するとソースファイルは保護するのがベターです。

#ソースバケットの指定
source_bucket_name = 'bucket_a'
#アーカイブバケットの指定
archive_bucket_name = 'bucket_b'

~~~

blobs = list(source_bucket.list_blobs())

for blob in blobs:
source_blob_name = blob.name

~~


#処理済みファイルをアーカイブバケットにコピー
archive_bucket = storage_client.get_bucket(archive_bucket_name)
archive_blob = archive_bucket.blob(source_blob_name)
source_bucket.copy_blob(blob, archive_bucket, source_blob_name)

# ソースバケットから元ファイルを削除
blob.delete()
Google Cloud開発お承ります

datacompanyでは、お客様のご予算・環境に応じたクラウド開発を承ります。

お困りごとがございましたら是非ご相談ください。

お問い合わせフォーム

BigQuery Data Transfer Service(DTS)の限界

BigQuery Data Transfer Service(DTS)の限界

・誰でも簡単にノーコードでBigQueryにデータ連携できることが魅力的
・例えば、Google Cloud Storageと連携すれば、CSVをアップロードする度にBigQueryも自動更新
・ただし、CSVのカラムを増やしたりするとエラーになる Excelも未対応
・工数はかかるが安定運用や長期的な工数を考慮すると、Cloud Runでフルスクラッチ開発がベター
・Cloud Runなら、カラムが変動しても、動的にBigQueryのテーブルを更新できる

BigQueryのようなDWHに社内外データを集約したい時に役立つのがBigQuery Data Transfer Service(DTS)です。

馴染みのない名前かもしれませんが、Google広告のデータをBigQueryに連携する時など、知らないうちに利用していたという方も多いと思います。

DTSは、たびたびアップデートされており、連携できるサービスも多数あります。

Google広告やYoutube、Google Cloud Storage(GCS)等のGoogle系サービスをはじめ、AWS(S3・Redshift等)やTeradata、最近ではSalesforceやMeta広告のプレビュー版が提供されています。

Salesforceのプレビュー版についてはこちらの記事もお勧めです。

AWS(Redshift)とBigQueryの連携についてはこちらの記事もお勧めです。

DTSのユースケース Google Cloud Storage(GCS)とBigQueryを連携

DTSで、よく使われているケースとして、GCSとBigQueryの連携があります。

特に、KPIをトラッキングする企業様において、目標値や実績等をCSVで管理されており、それらをBigQueryに連携して、他データと統合分析する場合に役立ちます。

実装もとても簡単です。

任意のCSVを用意します。1行目に変数名、2行目以降にデータを入力します。

GCSの任意のバケットにアップロードします。

アップロードしたCSVをクリックします。

このマークをクリックし、パスをコピーします。

BigQueryをひらきます。データセット横のマークをクリックし、テーブルを作成します。

テーブルの作成元「Google Cloud Storage」を選択し、GCSバケットの欄に、コピーしたGCSのパスをペーストします。

テーブルに任意の名前を入力します。画像加工によりプロジェクトがブランクになっていますが、プロジェクトはご自身のプロジェクト名が入ります。

スキーマをお好みで設定します。複雑なデータでなければ自動検出で可能です。

1行目をスキップし、変数名がデータとして取り込まれないようにします。

作成ボタンをクリックし、テーブルを確認します。成功すれば、このようにCSVが取り込まれます。

BigQueryの画面左「データ転送」→「転送を作成」をクリックします。

ソース「Google Cloud Storage」を選択し、任意の転送構成名を入力します。

データセットやGCSのCSV等を同じ要領で設定し、保存をクリックします。

設定が完了すると、画面が切り替わり、転送を開始します。

成功すると、緑色のチェックマークが点灯します。

DTSには限界もある

このように設定すると、CSVのレコードが増えても、GCSにCSVをアップロードしておけば、設定したタイミングでBigQueryに自動で反映します。

KPIをHourlyや日次で管理している企業様の場合、ノーコードで簡単な操作だけで、このようなデータ連携を構築できます。

しかし、DTSには限界もあります。

例えば、先ほどのCSVのD列にVar4を追加します。

GCSにアップロードし、上書きします。

DTSの画面で、今すぐ転送を実行します。これによりGCSのデータがBigQueryに手動で反映できます。

しばらくすると、処理がエラーになります。

エラーの内容です

Error while reading data, error message: CSV processing encountered too many errors, giving up. Rows: 0; errors: 3; max bad: 0; error percent: 0; JobID:...省略

many erros たくさんのエラーが発生し、正常に読み込まれたRowsレコードは0行です。

これは、CSVのカラムを増やしたことが原因です。

DTSは、CSVの仕様が変わると、このようなとエラーになります。

弊社のお客様には、社内データをCSVで管理されており、CSVのカラムを定期的に増やす企業様もおられます。

また、Excelで管理されている企業様もおられます。

そのようなケースの場合、DTSでは対応できません。

Cloud Runのフルスクラッチでカスタマイズ対応


このようなケースの場合、初期工数はかかりますが、Cloud Runのフルスクラッチ開発がベターになります。

Cloud Runなら、カラムが増えてもエラーにならず、動的にBigQueryのテーブルを更新できます。

特定セルの値だけBigQueryに取り込むことも可能で、Excelも取り込めます。

安定運用や長期的な工数軽減、柔軟に対応したい場合等を考慮すると、DTSより優れている場合が多いと思います。

Cloud RunはPythonに対応。様々なケースに応じた処理ができる。

Google Cloud開発お承ります

datacompanyでは、お客様のご予算・環境に応じたクラウド開発を承ります。

お困りごとがございましたら是非ご相談ください。

お問い合わせフォーム