
Cloud Run 関数で処理がループする時の対処法
・ETLでもよく使われるCloud Run 関数
・例えば、Google Cloud Storageに保存したエクセルをBigQueryに取り込んでテーブル化できる
・ただし、大量のエクセルを処理する等で、処理済みのエクセルが再処理されループに陥りやすい
・処理済みのエクセルを退避フォルダへアーカイブすることでループを回避できる
ETLでGoogle Cloud Storageに保存したエクセルを、BigQueryでテーブル化する時に役立つのがCloud Run 関数です。
Cloud Run 関数は最近になって正式リリースされたもので、Google Cloud Functionsと聞けば、馴染みのある方も多いと思います。
現在、Google Cloud Functionsという製品ブランドは廃止され、Cloud Run Functionsという名前になりました。
そのCloud Run Functionsには第1世代と第2世代があり、いわゆる旧Google Cloud Functionsは第1世代にあたります。
第2世代が、今日ご紹介するCloud Run 関数というわけです。
もし、これから、ETL等でご利用される場合、Cloud Run 関数(旧Google Cloud Functionsの第2世代)の利用を強く推奨いたします。
第1世代は制約も多く、使えるメモリー等のパフォーマンスも劣ります。
大容量データを、複雑な処理でバッチ化する場合、第1世代ではタイムアウト時間も最大540秒と短く、処理が終わらずに停止するケースがよくあります。
第2世代なら、ハイパフォーマンスで処理できますので、様々なビジネス課題に対応することが可能です。
今日は、その最新のCloud Run 関数のお話です。
Cloud Run 関数の処理がループする
例えば、Google Cloud Storage「バケットA」にエクセルが100個あるとします。
100個のエクセルを一括でBigQueryにテーブル化したい時、Cloud Run 関数が便利です。
たいていの場合、以下のようにsource_bucketを指定し、1回の実行で済むように、エクセルを1つ処理して、また次のエクセルを処理・・のようにループさせると思います。
#ソースバケットの指定
source_bucket_name = 'bucket_a'
~~~
blobs = list(source_bucket.list_blobs())
for blob in blobs:
source_blob_name = blob.name
~~~
しかし、このまま実行すると、一度処理したエクセルも再実行するループが発生します。
エクセルが少なければ処理できると思います。
しかし、エクセルが100個あるような場合、この想定外ループにより、処理が進まず、Cloud Run 関数がタイムアウトしてしまいます。
これは、「バケットA」にあるエクセルをすべて処理するまで、Cloud Run 関数が動き続けるためです。
ファイルが少なければ、ループが発生しても、すべてのファイルが処理される場合が多いのですが、ファイルが多い場合、異常終了してしまいます。
やっかいなことに、このように異常終了した場合も、Cloud Run 関数のエラーログにはErrorが吐き出されないため、LoggingやMonitoringの検知もすり抜けてしまいます。
つまり、未処理であることにユーザが気づきづらいのです。
処理済みエクセルはバケットBへアーカイブ。バケットAからも消す。
このような場合、Google Cloud StorageにバケットBを作り、処理したエクセルをバケットBへコピーしてアーカイブします。
そして、処理したエクセルもバケットAから消します。
これにより、すべてのエクセルは1度しか処理されず、ループ処理は解消します。
バケットBへアーカイブせず削除してもかまいませんが、リカバリ等を考慮するとソースファイルは保護するのがベターです。
#ソースバケットの指定
source_bucket_name = 'bucket_a'
#アーカイブバケットの指定
archive_bucket_name = 'bucket_b'
~~~
blobs = list(source_bucket.list_blobs())
for blob in blobs:
source_blob_name = blob.name
~~
#処理済みファイルをアーカイブバケットにコピー
archive_bucket = storage_client.get_bucket(archive_bucket_name)
archive_blob = archive_bucket.blob(source_blob_name)
source_bucket.copy_blob(blob, archive_bucket, source_blob_name)
# ソースバケットから元ファイルを削除
blob.delete()
Google Cloud開発お承ります
datacompanyでは、お客様のご予算・環境に応じたクラウド開発を承ります。
お困りごとがございましたら是非ご相談ください。