AWS LambdaとGoを活用した大容量ファイル処理システムの構築手法
本記事では、AWS S3、Lambda、およびGoを組み合わせて、大容量ファイルを処理するためのスケーラブルなファイル処理システムの構築について詳しく解説します。AWS サービスとGoプログラミングを活用して、効率的でスケーラブルなファイル処理ソリューションを実現する方法について、その詳細をご紹介します。
直面した課題
私たちの取り組みは、2ギガバイトまでの大容量ファイルのアップロードを可能にするチャットアプリケーションの新機能開発から始まりました。ファイルのアップロード管理自体は単純に見えますが、アップロード後にこれらの大容量ファイルを圧縮し、パスワード保護されたZIP形式に変換してユーザーに提供するという要件が加わりました。この状況は独特な課題を提示し、私たちは様々な実装戦略を検討し、その経験から得られた知見を共有することにしました。
新しいアプローチが必要とされた理由
当初は、既存のAPIサーバーでファイルのアップロードと処理を行うことを検討しました。しかし、ファイルサイズが数キロバイトから2GBまでと大きく変動することを考慮すると、このアプローチがAPI可用性に深刻な影響を与える可能性が明らかになりました。複数の大容量ファイルが同時にアップロードされた場合のダウンタイムのリスクや、サーバータイムアウトの問題により、専用のソリューションが必要とされました。
アップロード管理用に別個のEC2インスタンスを使用するオプションも検討されましたが、リソースの事前割り当ての非効率性から早々に却下されました。このようなセットアップは、使用率が低いまたはない期間に不要なコストが発生するだけでなく、ピーク時の可用性も保証できないことが分かりました。
スケーラビリティと効率性を考慮したAWS Lambdaの選択
実行可能なソリューションを探求する中で、私たちはAWS Lambdaに辿り着きました。特にS3イベント通知がPUT操作やマルチパートアップロードなどの特定のS3イベントに応答してLambda関数をトリガーできる機能に強く影響を受けました。
これが私たちのアーキテクチャの基盤となり、ファイル処理の自動化を実現しました。このアプローチのスケーラビリティは、アップロードされた各ファイルが処理用の新しいLambda関数インスタンスをトリガーすることで実現されており、システムの高いスケーラビリティと効率性を確保しています。この動的なスケーラビリティは、最小限から大規模なアップロード量まで、パフォーマンスを損なうことなく、また不要なコストを発生させることなく処理するために重要です。
アーキテクチャ
私たちのシステムアーキテクチャは、AWSサービスとクライアントサイドの操作を組み合わせて、大容量ファイルのアップロードを最適化するように戦略的に設計されています。以下に、コンポーネント間の相互作用の概要を示します:
クライアントサイドのロジック
ユーザーがファイルアップロードを開始すると、クライアントアプリケーションはまずファイルサイズを評価し、処理パスを指定するための所定のプレフィックスをS3オブジェクトキーに付加します:
1GB以上のファイルには "large/"
256MBから1GBまでのファイルには "medium/"
256MB未満のファイルには "small/"
S3バケット
ファイルはS3バケットにアップロードされ、サイズプレフィックスに基づいてディレクトリに分類されます。この編成により、S3イベント通知を通じて、各ファイルサイズに適したLambda関数のトリガーが行われます。大容量ファイルの場合、マルチパートアップロードが特に有効で、ファイルを並列アップロード用のセグメントに分割することで、信頼性と速度を向上させます。S3アップロードの最適化、特にマルチパートアップロードを通じた最適化については、今後の記事で詳しく解説する予定です。最新の情報と洞察については、Mediumでフォローをお願いします。
Lambda関数
私たちのアーキテクチャでは、様々なサイズのファイルを処理するために3つのカスタマイズされたLambda関数を使用し、クラウド運用の効率性とコスト効果を確保しています。以下に、これらの関数の構造を詳しく見ていきましょう:
large_file_archiver:2048MBのRAMが割り当てられ、1GB以上のファイルを効率的に処理するために予約されています。
medium_file_archiver:1024MBのRAMで、256MBから1GBまでのファイルを対象とし、リソース割り当てと処理ニーズのバランスを取ります。
small_file_archiver:256MB未満のファイル用に設計され、256MBのRAMで動作し、小規模なデータ負荷に対してリソースを最適化します。
これらの関数は、それぞれのファイルサイズカテゴリに一致するマルチパートアップロードイベントによって正確にトリガーされます。重要な点として、エフェメラルストレージを利用可能な最小量である512MBに設定しており、ファイル処理にエフェメラルストレージを依存しない戦略を強調しています。さらにパフォーマンスを改善し、運用境界内でのタスクの確実な実行を保証するために、各Lambda関数は許容される最大タイムアウトで設定されています。この慎重な調整は、大容量ファイルの延長処理時間を管理するだけでなく、システムの信頼性と運用効率を向上させるために不可欠です。
この階層化されたアプローチには二重の目的があります:より大きなリソースを必要とするLambdaは、本当にそのようなリソースを必要とするファイルにのみ呼び出されることを保証することでAWSの課金を最小限に抑え、小さなファイルに高容量の関数を使用する不要な費用を避けることができます。同時に、この戦略は、非常に特定のファイルサイズ範囲に合わせた過度の数のLambda関数を持つことによる複雑さと潜在的な非効率性を防ぎ、これは管理上の課題とオーバーヘッドの増加につながる可能性があります。Lambda関数をファイルサイズに慎重にマッピングすることで、スケーラビリティ、コスト管理、およびすべてのタイプのファイルサイズにわたる効果的な処理をサポートする最適なバランスを実現しています。
APIサーバーとWebhookエンドポイント
処理後、Lambda関数はAPIサーバーの/handle-file-processing-completion webhookエンドポイントにPUTリクエストを発行します。このエンドポイントは、ファイル処理の成功を確認する通知機能を果たします。このリクエストには、サーバーが適切なアクションを取るための参照としてs3_object_keyがペイロードに含まれています。さらに、このエンドポイントは失敗シナリオを管理するように設定でき、成功した完了と処理エラーの両方に対する包括的な通知システムを提供します。この機能は、アップロードステータスに関するユーザー通知や、処理済みファイルのアプリケーションワークフローへの統合など、さらなるアクションをトリガーするために不可欠です。
このwebhookエンドポイントがファイル処理通知を処理する一方で、他のすべてのAPI機能は処理タスクの影響を受けないように別個に管理されています。セキュリティも優先事項であり、このエンドポイントを不正アクセスから保護するための堅牢な認証プロトコルが整備されています。
要約すると、このアーキテクチャは堅牢でスケーラブルなファイル処理システムを提供します。APIサーバーの応答性と信頼性を確保しながら、大容量ファイルのアップロードと処理タスクを効率的に管理するために、関心事を優雅に分離しています。後続のセクションでLambda関数の実装の詳細に深く掘り下げ、サーバーレスアプローチの複雑さをさらに明らかにしていきます。
Lambda関数の実装
Lambda関数の初期開発段階では、AWS S3からファイルを取得し、保護されたzip形式に圧縮して、新しいプレフィックス"uploads/"を付けてS3に再アップロードすることを目指しました。このアプローチにより、"large/"、"medium/"、"small/"のプレフィックスを持つファイルのみが処理を開始するため、S3イベントトリガーの無限ループを避けることができました。
すぐに、この操作が本質的に処理のために全ファイルをRAMにロードする必要があることに気づきました。この洞察により、一時的なファイル保持のためにエフェメラルストレージを使用する効率性について疑問が生じました。ファイルが処理のためにRAMにロードされることを考えると、中間ステップとしてエフェメラルストレージを使用することは冗長でリソースを消費すると思われました。そのため、バッファを活用して元のファイルデータを直接RAMに保持することを選択しました。このアプローチにより、エフェメラルストレージへの不必要な読み書きサイクルを排除し、リソースを節約することができました。
この最適化にもかかわらず、この方法は単一のファイルを処理するために最大10GBのRAMを必要とし、約4分かかることが分かりました。このしきい値以下にLambda関数のRAM割り当てを最小化しようとする試みは失敗に終わり、リソース効率と処理能力のバランスが重要であることが強調されました。この認識により、システムリソースを圧迫することなく、RAMの本来の機能を活用できるより効率的な処理戦略を探求する必要性が浮き彫りになりました。
この処理の実践的な実装に興味のある方のために、以下のコードスニペットで、Lambda関数内でこのタスクを実現した方法を詳しく説明します。
package main
import (
"bytes"
"context"
"fmt"
"io"
"path/filepath"
"strings"
"github.com/alexmullins/zip" // A library used for zip operations, including encryption
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// S3Event struct to parse the event data from S3
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"` // Bucket name
} `json:"bucket"`
Object struct {
Key string `json:"key"` // Object key
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// HandleRequest is the Lambda function handler
func HandleRequest(ctx context.Context, event S3Event) (string, error) {
cfg, err := config.LoadDefaultConfig(ctx) // Load AWS SDK configuration
if err != nil {
return "", fmt.Errorf("configuration error: %v", err)
}
s3Client := s3.NewFromConfig(cfg) // Initialize S3 client
for _, record := range event.Records {
bucket := record.S3.Bucket.Name
key := record.S3.Object.Key
// Retrieve the object from S3
resp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return "", fmt.Errorf("unable to retrieve object: %v", err)
}
defer resp.Body.Close()
buf := new(bytes.Buffer) // Create a buffer to write the zip archive
zipWriter := zip.NewWriter(buf) // Initialize zip writer
// Create a zip entry and encrypt it with a password
f, err := zipWriter.Encrypt(filepath.Base(key), "password")
if err != nil {
return "", fmt.Errorf("unable to create zip entry: %v", err)
}
// Copy the file data to the zip entry
_, err = io.Copy(f, resp.Body)
if err != nil {
return "", fmt.Errorf("unable to write file to zip: %v", err)
}
// Finalize the zip file
if err := zipWriter.Close(); err != nil {
return "", fmt.Errorf("unable to finalize zip file: %v", err)
}
newKey := "uploads/" + strings.TrimSuffix(filepath.Base(key), filepath.Ext(key)) + ".zip"
// Upload the zipped file back to S3
uploader := manager.NewUploader(s3Client)
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &newKey,
Body: bytes.NewReader(buf.Bytes()),
})
if err != nil {
return "", fmt.Errorf("unable to upload zipped file: %v", err)
}
// Delete the original object from S3
_, err = s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return "", fmt.Errorf("unable to delete object: %v", err)
}
}
return fmt.Sprintf("Successfully processed %d records.", len(event.Records)), nil
}
func main() {
lambda.Start(HandleRequest)
}
画期的なパフォーマンスを実現するGoの並行処理の活用
バッファベースのファイル処理の限界 - その高いリソース消費と遅いパフォーマンス - により、より効率的なアプローチが必要となりました。io.Pipe()とGoの並行処理メカニズムの統合は、大容量ファイルを扱うLambda関数の大幅な最適化をもたらします。パイプのリーダーをメインスレッドで、ライターを別のゴルーチンで実行することで、このメソッドはメモリ使用量を削減し処理を高速化し、大きなバッファを必要とせずにデータがアプリケーションを滑らかに流れることを可能にします。この並行セットアップにより、ライターゴルーチンがパイプにデータをストリーミングしている間、メインスレッドのリーダーが同時に処理を行い、パフォーマンスを大幅に向上させる滑らかで効率的なデータ処理プロセスを促進します。
以下は、この並行モデルを採用した更新されたコードです:
package main
import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
"sync"
"github.com/alexmullins/zip"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
func HandleRequest(ctx context.Context, event S3Event) (string, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return "", fmt.Errorf("configuration error: %v", err)
}
s3Client := s3.NewFromConfig(cfg)
for _, record := range event.Records {
bucket := record.S3.Bucket.Name
key := record.S3.Object.Key
resp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return "", fmt.Errorf("unable to retrieve object: %v", err)
}
defer resp.Body.Close()
pr, pw := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer pw.Close()
defer wg.Done()
zipWriter := zip.NewWriter(pw)
f, err := zipWriter.Encrypt(filepath.Base(key), "password")
if err != nil {
pw.CloseWithError(fmt.Errorf("unable to create zip entry: %v", err))
return
}
if _, err = io.Copy(f, resp.Body); err != nil {
pw.CloseWithError(fmt.Errorf("unable to write file to zip: %v", err))
return
}
if err := zipWriter.Close(); err != nil {
pw.CloseWithError(fmt.Errorf("unable to finalize zip file: %v", err))
return
}
}()
newKey := "uploads/" + strings.TrimSuffix(filepath.Base(key), filepath.Ext(key)) + ".zip"
uploader := manager.NewUploader(s3Client)
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &newKey,
Body: pr,
})
if err != nil {
return "", fmt.Errorf("unable to upload zipped file: %v", err)
}
wg.Wait() // Ensure the goroutine completes
// Delete the original object from S3
_, err = s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return "", fmt.Errorf("unable to delete object: %v", err)
}
return fmt.Sprintf("Successfully processed %d records.", len(event.Records)), nil
}
func main() {
lambda.Start(HandleRequest)
}
パフォーマンスメトリクスの比較
以下は、2.22GBファイルの処理時間とメモリ使用量について、以前のバッファベースのアプローチと新しい並行処理ベースの戦略を比較したものです:
これらのメトリクスは、並行処理ベースのアプローチによる効率性の向上を鮮明に示しています。特に注目すべきは、2.22GBファイルの処理時間が約3.5分から約1.3分に短縮されたことです。さらに、メモリ消費量は10GBのRAM割り当てで8000MB以上から、並行処理を使用した場合の2GBのRAMで大幅に減少しました。この変更は、Goの並行処理の力を実証するだけでなく、コスト削減と処理速度向上の可能性も強調しています。
まとめ
このプロジェクトは、クラウドベースのファイル処理システムを成功裏に強化し、よりユーザーニーズに応答的なものにしました。Goの並行処理を活用しデータ処理を効率化することで、大幅なパフォーマンス向上を達成しました。この取り組みは、技術的ソリューションの継続的な改善の追求を強調し、期待を満たすだけでなく、それを超えることを確実にしています。今後を見据えると、この成功が触発する将来のイノベーションと改善に期待が高まります。
この記事は、弊社エンジニアの Rodan Ramdam が 2024 年 4 月に執筆し、日本語に翻訳したものです。
英語版はこちらをクリックしてください。
https://articles.wesionary.team/mastering-large-file-processing-with-aws-s3-lambda-and-go-3dde0a4c29c6
採用情報
私たちはプロダクト共創の仕組み化に取り組んでいます。プロダクト共創をリードするプロダクト・マネージャー、そして、私たちのビジョンを市場に届ける営業メンバーを募集しています!
開発パートナーをお探しの企業様へ
弊社は、グローバル開発のメリットを活かし、高い費用対効果と品質を両立しています。経験豊富で多様性のあるチームが、課題を正しく理解し、最適なシステムと優れた体験を実現します。業務システムの開発、新規事業の開発、業務効率化やDX化に関するお困りごと、ぜひ弊社にご相談ください。