自动化处理 Amazon S3 目录存储桶中的对象
通过 S3 批量操作和 AWS Lambda
作者:Cheryl Joseph Aaron Langford Murali Sankar 发布日期:2024年1月12日分类:高级 (300) 主题:亚马逊简单存储服务 (S3) 相关主题:AWS 身份与访问管理 (IAM),AWS Lambda 存储 技术教程
关键要点
数据是一切现代组织的生命线,企业需要在大量数据上执行操作,以加快处理速度。S3 批量操作可自动化 Lambda 处理数百万或数十亿个对象,简化大规模数据处理。使用 S3 Express One Zone 存储类时,应用程序能够实现单毫秒的请求延迟,充分利用批量操作和 Lambda 组合。数据,作为现代组织的核心组成部分,很少处于静态状态。为了支持高性能应用程序和工作负载,企业需要能在大量数据上运行操作,包括根据用例需要进行必要的数据修改,以加速处理过程。这可能包括为上传的图像添加水印、改变音频文件的比特率以优化不同设备的播放或 ETL 处理,其中需要提取、转换并加载各个文件。
最近,我们推出了 Amazon S3 Express One Zone 存储类,其性能比 Amazon S3 Standard 快达 10 倍。多年来,客户成功地使用 Amazon S3 触发 AWS Lambda 函数进行大规模数据处理并实现自定义文件处理逻辑。对于已经在 Amazon S3 中的数据,S3 批量操作 功能可以实现对数百万或数十亿个对象的自动化处理。
在本文中,我们将展示如何使用 S3 批量操作来自动化处理 S3 Express One Zone 中的对象,结合 AWS Lambda,修改并复制对象从一个 S3 目录存储桶到另一个。需要一致的单毫秒请求延迟的性能关键型应用程序,可以利用批量操作进一步加快处理时间。
S3 Express One Zone 和 S3 批量操作
Amazon S3 Express One Zone 是一种新的高性能单可用区 S3 存储类,旨在为最敏感于延迟的应用程序提供一致的单毫秒数据访问。凭借 S3 Express One Zone 完全弹性的存储和几乎无限的规模,您无需提前规划或配置容量或吞吐量要求。用户可以在选择存储 S3 数据的 AWS 区域时,选择特定的 AWS 可用区,将存储和计算资源放置在同一可用区,以进一步优化性能,帮助降低计算成本,快速运行工作负载。依靠 S3 Express One Zone,数据存储在一种不同的存储桶类型中Amazon S3 目录存储桶,支持每秒数十万请求。
使用 S3 批量操作,您可以在特定 S3 对象列表上执行大规模的批量操作。设置 S3 批量操作时,您无需编写代码或部署服务,只需几次点击即可设置作业,随后交由 S3 处理。单个作业可以在包含 exabytes 数据的数十亿个对象上执行指定操作。S3 批量操作跟踪作业进度、发送通知,并存储所有操作的详细完成报告,提供完全托管、可审计和无服务器的体验。您可以通过 AWS 管理控制台、AWS 命令行界面 (CLI)、Amazon SDK 或 REST API 使用 S3 批量操作。
解决方案概述与指导
您可以使用 S3 批量操作在大型 S3 Express One Zone 对象上调用 Lambda 函数,从而为数据处理提供更多选择。当您创建 S3 批量操作作业时,需要提供一个位于 S3 常规用途存储桶中的清单文件,其中包含所有要处理的 S3 Express One Zone 对象的列表。可以通过将 S3 ListObjectsV2 结果导出为 CSV 文件来创建该清单文件。接下来,您可以配置对这些对象执行的 Lambda 操作。当作业开始时,Amazon S3 将为清单中的每个对象调用 Lambda 函数。
要开始,您需要将更新的 AWS SDK 版本和新的负载架构打包到 AWS Lambda 以结合使用 S3 批量操作、S3 Express One Zone 和 AWS Lambda。接下来将介绍以下内容:
负载和 AWS Lambda 函数代码创建一个调用 AWS Lambda 函数的 S3 批量操作作业打包 AWS Lambda 函数1 负载和 AWS Lambda 函数代码
S3 批量操作需要所需的权限以调用 Lambda 函数。您可以使用该指南帮助您 开始使用 S3 批量操作与 Lambda。您还可以使用此处列出的说明来 帮助您创建 S3 批量操作作业。
S3 批量操作引入了一个 新的负载架构 到 Lambda,以帮助简化您使用 S3 批量操作和 Lambda 的体验。这包括在“创建作业”请求中添加 UserArguments 和 InvocationSchemaVersion。要初始化 Lambda 函数,以对目录存储桶中的对象执行自定义操作,您必须将 InvocationSchemaVersion 的值指定为 20。
示例请求和响应对于清单中列出的每个对象,S3 批量操作将调用 Lambda,并使用包含每个对象信息的负载。以下是一个请求的 JSON 示例,其中新的字段用 粗体 标示。
json{ invocationSchemaVersion 20 invocationId Jr3s8KZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZzatx7Ruy job { id ry77cd6061f64a2b8a21d07600c874gf userArguments { MyDestinationBucket lt目标目录存储桶名称gt MyDestinationBucketRegion lt目标目录存储桶区域gt MyDestinationPrefix copied/ MyDestinationObjectKeySuffix newsuffix } } tasks [ { taskId y5R3a2lkZ29lc2hlurcS s3Key ltS3 对象密钥gt s3VersionId null s3Bucket lt源目录存储桶名称gt } ]}
结果代码和结果字符串包含在 S3 批量操作作业报告中。请注意,结果字符串将在作业报告中截断为 1024 个字符。
示例 Lambda 函数用于 S3 批量操作以下示例 Python Lambda 使用用户参数中的数据重命名 S3 对象密钥。然后将这些对象复制到新的目录存储桶中。返回的有效负载包含处理每个对象的结果代码和结果字符串。
将在您的计算机上把此 Lambda 函数保存为 lambdafunctionpy。
pythonimport datetimeimport osimport timeimport boto3from urllib import parsefrom botocoreexceptions import ClientError
def lambdahandler(event context)

# 从 S3 批量操作解析作业参数invocationId = event[invocationId]invocationSchemaVersion = event[invocationSchemaVersion]# 解析用户参数userArguments = event[job][userArguments]destinationBucket = userArguments[MyDestinationBucket]destinationBucketRegion = userArguments[MyDestinationBucketRegion]destinationPrefix = userArguments[MyDestinationPrefix]destinationObjectKeySuffix = userArguments[MyDestinationObjectKeySuffix]# 准备结果results = []s3Client = boto3client(s3 regionname=destinationBucketRegion)for task in event[tasks] taskId = task[taskId] # 解析 Amazon S3 密钥、密钥版本和存储桶名称 s3Key = parseunquote(task[s3Key] encoding=utf8) s3VersionId = task[s3VersionId] s3Bucket = task[s3Bucket] # 构造 CopySource 和 VersionId copySrc = {Bucket s3Bucket Key s3Key} if s3VersionId is not None copySrc[VersionId] = s3VersionId try # 准备结果代码和字符串 resultCode = None resultString = None # 构造新的密钥 newKey = destinationPrefix s3Key destinationObjectKeySuffix # 复制对象到新的存储桶 response = s3Clientcopyobject( Bucket=destinationBucket CopySource=copySrc Key=newKey ) # 标记为成功 resultCode = Succeeded # 结果字符串将在批量操作中截断 resultString = str(response) except ClientError as e # 对所有处理过的异常,标记为临时失败,S3 批量操作将重试该任务。如果收到任何其他异常,标记为永久失败。 print(e) errorCode = eresponse[Error][Code] errorMessage = eresponse[Error][Message] if errorCode == RequestTimeout resultCode = TemporaryFailure resultString = 因请求超时重试请求至 Amazon S3。 elif errorCode == SlowDown resultCode = TemporaryFailure resultString = 因请求减缓重试请求至 Amazon S3。 elif errorCode == 503 Service Unavailable resultCode = TemporaryFailure resultString = 因503服务不可用重试请求至 Amazon S3。 elif errorCode == ServiceUnavailable resultCode = TemporaryFailure resultString = 因服务不可用重试请求至 Amazon S3。 elif errorCode == InternalServerError resultCode = TemporaryFailure resultString = 因内部服务器错误重试请求至 Amazon S3。 elif errorCode == InternalError resultCode = TemporaryFailure resultString = 因内部错误重试请求至 Amazon S3。 else resultCode = PermanentFailure resultString = {} {}format(errorCode errorMessage) except Exception as e print(e) # 捕捉所有异常并永久失败任务 resultCode = PermanentFailure resultString = 异常 {}format(emessage) finally resultsappend({ taskId taskId resultCode resultCode resultString resultString }) return { invocationSchemaVersion invocationSchemaVersion treatMissingKeysAs PermanentFailure invocationId invocationId results results}2 创建一个调用 AWS Lambda 函数的 S3 批量操作作业
创建一个调用 Lambda 函数的 S3 批量操作作业时,您必须提供以下内容:
小熊加速器bear官方版Lambda 函数的 ARN可能包括函数别名或特定版本号具有调用该函数权限的 AWS 身份与访问管理 (IAM) 角色操作参数 “LambdaInvoke”有关如何创建 S3 批量操作作业的更多信息,请参见 创建 S3 批量操作作业 和 S3 批量操作支持的操作 文档。
以下示例使用 AWS CLI 创建一个调用 Lambda 函数的 S3 批量操作作业,并包含新的 UserArguments。您必须使用 2142 或更高版本的 AWS CLI。
bashaws s3control createjob accountid ltAccountIDgt operation {LambdaInvoke {FunctionArnltLambda Function ARNgtInvocationSchemaVersion20 UserArguments {MyDestinationBucket lt目标存储 bucket名称gt MyDestinationBucketRegion lt目标存储 bucket区域gt MyDestinationPrefix copied/ MyDestinationObjectKeySuffix newsuffix}}} manifest {Spec{FormatS3BatchOperationsCSV20180820Fields [BucketKey]}Location{ObjectArnltS3 清单文件 ARNgt ETagltS3 清单文件实体标签gt}} priority 1 rolearn lt批量操作角色 ARNgt noconfirmationrequired report {Bucketlt报告存储桶 ARNgtFormatReportCSV20180820EnabledtruePrefixReportPrefixReportScopeAllTasks}
3 打包 AWS Lambda 函数
要将 Lambda 与新的 S3 目录存储桶集成,您必须使用 13313 版本或更高的 AWS SDK for Python。您可以通过将 Lambda 打包为 zip 文件并上传到 Lambda 进行此操作。该函数还必须拥有复制对象到目标存储桶的必要权限。您可以按照此 打包 Lambda 函数的指南。
将以下策略文件保存在本地计算机上:
示例 trustpolicyjson
json{ Version 20121017 Statement [ { Effect Allow Principal { Service lambdaamazonawscom } Action stsAssumeRole } ]}
示例 s3expresspolicyforlambdajson
json{ Version 20121017 Statement [ { Sid Statement1 Effect Allow Action [ s3expressCreateSession ] Resource [ lt源目录存储桶 ARNgt lt目标目录存储桶 ARNgt ] } ]}
从包含 Lambda 函数和策略文件的相同文件夹中运行以下命令。
bash
创建 IAM 角色和策略
aws iam createrole rolename lambdaex assumerolepolicydocument file//trustpolicyjson
附加策略
aws iam attachrolepolicy rolename lambdaex policyarn arnawsiamawspolicy/servicerole/AWSLambdaBasicExecutionRole
附加与 S3 目录存储桶通信的策略
aws iam putrolepolicy rolename lambdaex policyname s3expresspolicyforlambda policydocument file//s3expresspolicyforlambdajson
打包代码
mkdir packagepip install target /package boto3cd packagezip r /packagezip cd zip packagezip lambdafunctionpy
创建函数
aws lambda createfunction functionname s3batchopsfunction zipfile fileb//packagezip handler lambdafunctionlambdahandler runtime python311 role arnawsiam123456789012role/l
使用 Amazon RDS Proxy 将外部应用程序连接到 Amazon RDS 实例 数据库博客
使用 Amazon RDS Proxy 连接外部应用程序到 Amazon RDS 实例关键要点Amazon RDS Proxy 是一种完全托管的数据库代理,能提高应用程序的可扩展性和安全性。本文介绍如...
实时获取社交媒体洞察,使用亚马逊托管的 Apache Flink 服务和亚马逊 Bedrock 大数
实时获取社交媒体洞察:使用 Amazon 管理的 Apache Flink 与 Amazon Bedrock作者 Francisco Morillo Subham Rakshit 和 Sergio G...