カミナシ エンジニアブログ

株式会社カミナシのエンジニアが色々書くブログです

フロントエンドから Amazon S3 にマルチパートアップロードしたい

はじめに

Presigned URL(*) などで、Amazon S3 へのアップロード処理を実装していると、大きなサイズのファイルをアップロードしようとしたときに、以下のような課題に直面することがあります。

  • 一回のPUT リクエストでアップロードできるサイズの上限が 5GB まで
  • 単一の HTTP リクエストでアップロードするため、大きなサイズをアップロードしようとしたときに問題が起きる。例えば、アップロードの処理の途中で失敗したとき、最初からやり直しになる。

このようなときに活用したいのが、マルチパートアップロードです。マルチパートアップロードとは、その名の通り、アップロード対象のオブジェクトを小分けにしてアップロードする方法です。

AWS の SDK には、マルチパートアップロードが簡単に行えるような API が用意されているものの、多くは、S3 にアップロードを行うことができる IAM クレデンシャルを有するアプリケーションでの利用が想定されているように見えます。 Presigned URL を利用した単一の PUT オペレーションによるファイルアップロードと同様、フロントエンドアプリケーションから直接アップロードできた方が、パフォーマンス面などで利点が大きいのでは?と思いました。そこで、フロントエンドからマルチパートアップロードをする方法を探ってみました。

*Presigned URL や、より柔軟な Post Policy については以下の記事もご確認ください

kaminashi-developer.hatenablog.jp

マルチパートアップロードとは?

マルチパートアップロードとは、アップロード対象のオブジェクトを小分けにしてアップロードする方法です。例えば、100 MB のファイルを、10MB * 10 個のペイロード(パート)にわけ、アップロードを行うようなイメージです。

マルチパートアップロードには、以下のような利点があり、アプリケーションのパフォーマンスや信頼性の向上に役立ちます。

  • part を並列にアップロードすることでスループットの向上が期待できる
  • ネットワークエラーなどで、一部の part のアップロードが失敗した場合でも、失敗したもののみ、リトライすれば良い。
  • ネットワークが不安定になった際などに、アップロードを途中で停止し、再開するなどの柔軟な制御もできる
  • アップロード対象のオブジェクトのサイズが未定(オブジェクト自体を生成中)であってもアップロードを開始できる

マルチパートアップロード処理は、「1. アップロード開始」 →「2. ファイルを小分にしてアップロード」→「3. アップロード終了」の合計 3 step の API コールで実装します。

マルチパートアップロードのフロー

Amazon S3 の User Guide では、オブジェクトのサイズが、100 MB に達する場合はマルチパートアップロードを検討した方が良いとされています。

フロントエンドからマルチパートアップロードをする方法

本記事では、以下2つの方法を紹介します。次項以降で、実装例を踏まえてそれぞれ詳しくみていきます。実装にはバックエンドは Go、フロントエンドは TypeScript を使用しています。

🗒️ 方法1: 一時的なIAM クレデンシャルをフロントエンドアプリケーションに渡すパターン。それを用いてフロントエンドで 3 step の処理を実行する

方法1のフロー

🗒️ 方法2: フロントエンドでは Presigned URL による UploadPart のみを行うパターン。アップロードの開始・終了はバックエンドで行い、バックエンドで part 分の Presigned URL を発行。フロントエンドでは UploadPart のみを実行する

方法2のフロー

方法1: 一時的なIAM クレデンシャルをフロントエンドアプリケーションに渡すパターン

この方法では、Amazon Cognito の Developer authenticated identities authflow を用いて、フロントエンドアプリケーションが IAM クレデンシャルを取得し、マルチパートアップロードの一連のシーケンスを実行します。

AWS リソースの構築方法や、ソースコードについては、Appendix をご確認ください。以下で要点を説明します。

以下はバックエンドで、Amazon Cognito から、OpenID Token を取得するコードの例です。ここで取得した OpenID Token をフロントエンドに受け渡します。

func (h OpenIDTokenHandler) getOpenIDToken(ctx context.Context) (GetOpenIDTokenResponse, error) {

    identityPoolID := os.Getenv("AWS_IDENTITY_POOL_ID")
    loginProvider := os.Getenv("AWS_LOGIN_PROVIDER")
    loginName := getLoginName(ctx)

    resp, err := h.client.GetOpenIdTokenForDeveloperIdentity(ctx, &cognitoidentity.GetOpenIdTokenForDeveloperIdentityInput{
        IdentityPoolId: aws.String(identityPoolID),
        Logins: map[string]string{
            loginProvider: loginName,
        },
        TokenDuration: aws.Int64(15 * 60),
        PrincipalTags: map[string]string{
            "loginName": loginName,
        },
    })

    if err != nil {
        return GetOpenIDTokenResponse{}, err
    }

    bucketName := os.Getenv("AWS_BUCKET")
    region := os.Getenv("AWS_REGION")

    return GetOpenIDTokenResponse{
        OpenIDToken: OpenIDToken{
            IdentityID: *resp.IdentityId,
            Token:      *resp.Token,
        },
        Region:    region,
        Bucket:    bucketName,
        KeyPrefix: loginName,
    }, nil
}

以下は、フロントエンドにおいて、バックエンドから受け取った OpenID Token を用いて IAM Credential を取得する例です。

   const openIDTokenResponse = await fetch(
        `${API_ORIGIN}/openIDToken`,
    ).then<GetOpenIDTokenResponse>((response) => response.json());

    const {
        openIDToken: { identityId, token },
        region,
        bucket,
        keyPrefix,
    } = openIDTokenResponse;

    const cognitoClient = new CognitoIdentityClient({ region });
    const getCredentialsCommand = new GetCredentialsForIdentityCommand({
        IdentityId: identityId,
        Logins: {
            "cognito-identity.amazonaws.com": token,
        },
    });
    const cognitoResponse = await cognitoClient.send(getCredentialsCommand);

これでやっと、フロントエンドで AWS の API を呼び出す準備が整います。以下のコードは、マルチーパートアップロードのシーケンスを実行している例です。

   let uploadId: string | undefined = undefined;
    const objectKey = `${keyPrefix}/${uuidv4()}`;

    try {
        // 1. アップロード開始
        const multipartUpload = await s3Client.send(
            new CreateMultipartUploadCommand({
                Bucket: bucket,
                Key: objectKey,
            }),
        );

        uploadId = multipartUpload.UploadId;

        const uploadPromises: Promise<{
            ETag: string;
            partNumber: number;
        }>[] = [];

        // 2. ファイルを part 毎にアップロード
        for (let i = 0; i < file.size; i += PART_SIZE) {
            const part = file.slice(i, i + PART_SIZE);
            const partNumber = i / PART_SIZE + 1;
            const uploadPartPromise = s3Client
                .send(
                    new UploadPartCommand({
                        Bucket: bucket,
                        Key: objectKey,
                        PartNumber: partNumber,
                        UploadId: uploadId,
                        Body: part,
                    }),
                )
                .then((response) => ({
                    ETag: response.ETag ?? "",
                    partNumber,
                }));
            uploadPromises.push(uploadPartPromise);
        }

        const uploadResponses = await Promise.all(uploadPromises);

        // 3. アップロード完了
        const completion = await s3Client.send(
            new CompleteMultipartUploadCommand({
                Bucket: bucket,
                Key: objectKey,
                UploadId: uploadId,
                MultipartUpload: {
                    Parts: uploadResponses.map((response) => ({
                        ETag: response.ETag,
                        PartNumber: response.partNumber,
                    })),
                },
            }),
        );

        console.log({ completion });
    } catch (error) {
        // エラーが発生した時の処理
    }

以上が、マルチパートアップロードを完了するまでの一連の流れです。もし、アップロード開始〜アップロード中にエラーが発生するなどし、アップロード処理を途中で終了したいときには、以下のように、AbortMultipartUpload を実行します。

   try {
        ….
    } catch (error) {
        console.error(error);

        if (uploadId) {
            await s3Client.send(
                new AbortMultipartUploadCommand({
                    Bucket: bucket,
                    Key: objectKey,
                    UploadId: uploadId,
                }),
            );
        }
    }

なお、今回は Principal Tag を用いることで、ログインユーザー毎にディレクトリ(正確にはオブジェクトのプレフィックス)を、切り分けています。

後の考察でも触れようと思いますが、この方法の欠点は、アップロードするオブジェクトの key をフロントエンドで決定していることです。Path Traversal を避けるため、よほど特別な条件が揃わない限り、フロントエンドでオブジェクトのキーが決定されるような実装は避けた方が良いでしょう。

方法2: フロントエンドでは Presigned URL による UploadPart のみを行うパターン

フロントエンドでは、Presigned URL による UploadPart のみを行い、それ以外の AWS API コールは、フロントエンドからの要求に応じて、バックエンドで行う方法です。 前項と同様、AWS リソースの構築方法や、ソースコードについては、Appendix をご確認ください。以下で要点を説明します。

以下はバックエンドで、CreateMultipartUpload を行っているコード例です。

func (h MultipartUploadHandler) createMultipartUpload(ctx context.Context) (MultipartUploadTarget, error) {

    key := uuid.NewString()
    bucketName := os.Getenv("AWS_BUCKET")
    resp, err := h.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
        Bucket: &bucketName,
        Key:    &key,
    })

    if err != nil {
        return MultipartUploadTarget{}, err
    }

    return MultipartUploadTarget{
        UploadID: *resp.UploadId,
        Bucket:   bucketName,
        Key:      key,
    }, nil
}

以下は、バックエンドで、Presigned URL を生成している例です。アップロード対象の part の数は、フロントエンドでカウントし、その情報を元にバックエンドで part 数分の Presigned URL を生成し、フロントエンドに渡します。

func (h MultipartUploadHandler) createUploadPartURLInfos(ctx context.Context, multipartUploadTarget MultipartUploadTarget, partCount int) ([]UploadPartURLInfo, error) {
    uploadPartURLInfos := make([]UploadPartURLInfo, 0, partCount)
    for i := 0; i < partCount; i++ {
        partNumber := int32(i + 1)
        uploadPartURL, err := h.createPresignedURL(ctx, multipartUploadTarget, partNumber)
        if err != nil {
            return nil, err
        }
        uploadPartURLInfos = append(uploadPartURLInfos, UploadPartURLInfo{
            PartNumber:    partNumber,
            UploadPartURL: uploadPartURL,
        })
    }
    return uploadPartURLInfos, nil
}

func (h MultipartUploadHandler) createPresignedURL(ctx context.Context, multipartUploadTarget MultipartUploadTarget, partNumber int32) (string, error) {

    resp, err := s3.NewPresignClient(h.client).PresignPutObject(ctx, &s3.PutObjectInput{
        Bucket: &multipartUploadTarget.Bucket,
        Key:    &multipartUploadTarget.Key,
    }, func(options *s3.PresignOptions) {
        options.Expires = time.Duration(60 * time.Second)
        originalPresigner := options.Presigner
        options.Presigner = uploadPartPresigner{
            uploadId:   multipartUploadTarget.UploadID,
            partNumber: partNumber,
            orig:       originalPresigner,
        }
    })

    if err != nil {
        return "", err
    }

    return resp.URL, nil
}

Presigned URL を生成する時点で、クエリパラメータとして partNumber、uploadId を指定しなければならない点には注意が必要です。本例では、AWS SDK for Go v2 を使っていますが、今回は以下のようにして、presigner をカスタマイズし、実現しています。

type uploadPartPresigner struct {
    uploadId   string
    partNumber int32
    orig       s3.HTTPPresignerV4
}

func (pw uploadPartPresigner) PresignHTTP(
    ctx context.Context, credentials aws.Credentials, r *http.Request,
    payloadHash string, service string, region string, signingTime time.Time,
    optFns ...func(*v4.SignerOptions),
) (url string, signedHeader http.Header, err error) {
    q := r.URL.Query()
    q.Add("uploadId", pw.uploadId)
    q.Add("partNumber", fmt.Sprintf("%d", pw.partNumber))
    r.URL.RawQuery = q.Encode()
    return pw.orig.PresignHTTP(ctx, credentials, r, payloadHash, service, region, signingTime, optFns...)
}

以下は、フロントエンドにおいて、バックエンドから得た Presigned URL を用いて、UploadPart を行っているコードです。

   try {
        const uploadPromises: Promise<{ ETag: string; partNumber: number }>[] =
            [];
        const partChecksums: ArrayBuffer[] = [];

        // 2. ファイルを part 毎にアップロード
        for (let i = 0; i < parts.length; i++) {
            const partContent = parts[i].content;
            const partChecksum = parts[i].checksum;
            const partNumber = uploadPartURLInfos[i].partNumber;
            const uploadPartURL = uploadPartURLInfos[i].uploadPartURL;
            partChecksums.push(partChecksum);

            const uploadPartFetch = () =>
                fetch(`${uploadPartURL}`, {
                    method: "PUT",
                    headers: {
                        "Content-Type": "application/octet-stream",
                    },
                    body: partContent,
                })
                    .then((res) => {
                        if (!res.ok) {
                            throw new Error("Failed to upload part");
                        }
                        const eTag = res.headers.get("ETag");
                        const expectedETag = `"${buf2hex(partChecksum)}"`;
                        if (eTag !== expectedETag) {
                            console.warn(
                                `ETag mismatch: expected ${expectedETag}, got ${eTag}`,
                            );
                        }
                        return { eTag };
                    })
                    .then(({ eTag }) => ({
                        ETag: eTag ?? "",
                        partNumber,
                    }));

            uploadPromises.push(retry(uploadPartFetch, 0));
        }

        const uploadResponses = await Promise.all(uploadPromises);

        const checksumOfChecksumsString = calcChecksum(...partChecksums).hex();

        // 3. アップロード完了
        ……
    } catch (error) {
        // エラーが発生した時の処理
    }

フロントエンドで、UploadPart が完了したあとは、バックエンドで CompleteMultiplartUpload を行います。今回の実装例では、簡単のため、フロントエンドから与えられた uploadId、bucket 名、オブジェクトの key をそのまま処理で使っていますが、実際のアプリケーションでは、想定外の値が与えられないように、バリデーションをかけるのが望ましいでしょう。

func (h MultipartUploadHandler) completeMultipartUpload(ctx context.Context, req CompleteMultipartUploadRequest) error {

    completeParts := make([]types.CompletedPart, 0, len(req.Parts))
    for _, p := range req.Parts {
        completeParts = append(completeParts, types.CompletedPart{
            ETag:       &p.ETag,
            PartNumber: &p.PartNumber,
        })
    }

    resp, err := h.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
        Bucket:   &req.Bucket,
        Key:      &req.Key,
        UploadId: &req.UploadID,
        MultipartUpload: &types.CompletedMultipartUpload{
            Parts: completeParts,
        },
    })

    if err != nil {
        return err

    }

    return err
}

アップロード開始〜アップロード中にエラーが発生するなどし、アップロード処理を途中で終了したいときには、以下のように、バックエンドで AbortMultipartUpload を実行します。

func (h MultipartUploadHandler) abortMultipartUpload(ctx context.Context, req AbortMultipartUploadRequest) error {
    _, err := h.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
        Bucket:   &req.Bucket,
        Key:      &req.Key,
        UploadId: &req.UploadID,
    })
    return err
}

その他留意事項

マルチパートアップロードを実装する上では、以下のような点にも注意が必要です。(一部「フロントエンドからの」マルチパートアップロード特有ではないものも列挙しています)

📝 マルチパートアップロードに必要な IAM Policy

同一の IAM クレデンシャルを有する主体が、CreateMultiPartUpload、UploadPart、CompleteMultipartUpload | AbortMultiplartUpload をするだけであれば、以下のように、対象のバケットにs3:PutObject を許可するだけで事足ります。CreateMultiPartUpload、UploadPart を実行するためには、s3:PutObject を許可されていればよく、また、デフォルトでは、マルチパートアップロードを開始した主体は、AbortMultiplartUpload も許可されるためです。

詳細は、Amazon S3 User Guide の Multipart upload API and permissions をご確認ください。

{
    "Statement": [
        {
            "Action": "s3:PutObject",
            "Effect": "Allow",
            "Resource": "arn:aws:s3:::${YOUR_BUCKET_NAME}/*"
        }
    ],
    "Version": "2012-10-17"
}

📝 チェックサムの方法

アップロード時のリクエストヘッダに計算したチェックサムを含める、アップロード後の Etag の値を検証する、などいくつか方法があるようです。今回は一部うまく実装・検証できていないものがあり、引き続き調査しようと考えています。

今回のサンプル実装では、ETag の値をアップロード後に検査し、誤りを検知する実装にとどめました。コードについては、Appendix をご確認ください。

チェックサムについての詳細は、Amazon S3 User Guide の Checking object integrity をご確認ください。

📝 アップロード対象のサイズに関する制約

Presigned URL などを用いた単一の PUT リクエストでは、最大 5 GB のオブジェクトをアップロードできますが、マルチパートアップロードでは最大で 5 TB (正確には 5 TiB)のオブジェクトをアップロードできます。

  • Part 数(分割数)の最大は 10000
  • Part サイズの最小は 5 MiB

などの制約に気をつけて、何 MiB/GiB 単位で分割するかを決める必要があります。

5 MiB 未満のオブジェクトをマルチパートアップロードのフローでアップロードすることも可能です。アプリケーションを実装する際に、オブジェクトのサイズが、5 MiB 未満かどうかによって、単一の PUT リクエストを用いるか、マルチパートアップロードを用いるかを分岐させる必要はありません。

サイズなどの制限についての詳細は、Amazon S3 User Guide の Amazon S3 multipart upload limits をご確認ください。

📝 UploadPart の並列数

今回のサンプル実装では、UploadPart の際の並列数を特に制限していませんが、利用環境次第では並列数を制限した方が良いかもしれません。例えば、帯域があまり広くない環境での利用が想定される場合は、並列数を少なめ(1〜2 程度)に抑えて、少数の端末のマルチパートアップロードだけで帯域を消費し尽くさない、といった考慮が必要な場合もあるでしょう。

📝 UploadPart でリトライした結果、同一の part が複数 PUT されても大丈夫か?

実際のアプリケーションでは、各パートのアップロードに失敗した場合、リトライするような処理を実装するケースも多いと思われます。ここで気になるのが、同一の part のアップロードが複数回実行された場合でも、part の重複などが発生し、不正なデータにならないか?ですが、この辺は問題ないようです。

Amazon S3 User Guide の Multipart upload process には、「If you upload a new part using the same part number as a previously uploaded part, the previously uploaded part is overwritten.」(「以前にアップロードしたパートと同じパート番号を使って新しいパートをアップロードした場合、以前のパートは上書きされます。」)と書かれています。

また、実際に同一の part number を複数アップロードするような処理を実装して、動作確認してみましたが、問題なく動作しました。

📝 マルチパートアップロードを開始したあと、Complete も Abort もしなかったらどうなるか?

マルチパートアップロードを Complete または Abort するまで、全ての part が Amazon S3 上に保持されるため、マルチパートアップロードを開始し、part のアップロードを行った後に、Complete も Abort もしなかった場合、無用にストレージコストがかかってしまいます。

そのため、「不完全なマルチパートアップロードが存在する場合、一定期間経過後、それらを削除する」ライフサイクルルールを設定することが推奨されています。具体的には以下のようなルールを設定します。以下は terraform で設定する例です。

resource "aws_s3_bucket_lifecycle_configuration" "s3_bucket_lifecycle_configuration" {
  bucket = aws_s3_bucket.s3_bucket.id
  rule {
    id     = "abort-incomplete-multipart-upload-after-7-days"
    status = "Enabled"
    abort_incomplete_multipart_upload {
      days_after_initiation = 7
    }
  }
}

詳細は、Amazon S3 User Guide の Multipart upload and pricing をご確認ください。

📝 異なる uploadId から、同一の key に対してアップロードが行われた時はどうなるか?

手元で動作させてみた限り、マルチパートアップロードの「スタートが早い方」のオブジェクトが優先されるかのような動作でしたが、Amazon S3 User Guide の Concurrent multipart upload operations を読む限りは、「後勝ちか、先勝ちか?」などが明確には定まっていないように読み取れました。

このあたりの仕様に厳密さが求められる場合、バケットのバージョニングを有効化する・そもそも複数クライアントから同時に同一の key へのアクセスが発生しづらい実装にする、などの対策が必要でしょう。

比較・考察

前項までで、フロントエンドからのマルチパートアップロードの2つの方法を紹介しました。この2つの方法のうち、どちらを採用すれば良いのでしょうか?

「方法1」の方が、マルチパートアップロード自体のシーケンスが理解しやすくシンプルで、その点では良いようにも思われます。 しかし、よほど特殊な条件が整わない限り、「方法2」の「フロントエンドでは Presigned URL による UploadPart のみを行う」パターンを採用した方が良いと考えます。

最大の理由はセキュリティです。「方法1」では、フロントエンドでアップロードするオブジェクトのキーが決定できるようになっています。これは、Path Traversal のリスクがあり、通常は避けたいところです。

ただし、「オブジェクトのキーを動的にする必要がないため、IAM Policy などを工夫し、実質的にキーを強制できる」など、リスクが許容できるものである場合、「方法1」を採用する余地はあると思います。

おわりに

フロントエンドのアプリケーションから、Amazon S3 にアップロードする方法をご紹介しました。フロントエンドからサイズの大きなオブジェクトをアップロードする際に、パフォーマンスや信頼性を向上したい時に、使ってみる余地がありそうですね!

最後に宣伝です📣

カミナシでは絶賛採用中です!一緒に最高のサービスを作っていく仲間を募集しています!

参考サイト

今回使用したサンプルコードは Github リポジトリに公開しています。

github.com