Serverless 架构是近年来前端开发领域内的趋势,其优势在于可以以低成本、高效率、高可靠性的方式创建、部署、运行应用程序。利用 Serverless 架构可以快速搭建出一个数据采集与清洗系统,本文将介绍如何使用 AWS Lambda、S3 和 DynamoDB,实现一个 Serverless 的数据采集与清洗系统。
Serverless 架构概述
在传统架构中,需要自己购买服务器、配置环境、运维等,这些都需要占用时间和费用。而在 Serverless 架构中,这个过程可以被自动化,只需要编写代码,无需担心服务器的操作系统和网络环境等问题,因为服务提供商会帮你处理这些。Serverless 架构的优势在于简化了应用程序开发、运维以及成本。
服务组件
AWS Lambda
AWS Lambda 是一种无服务器计算服务,可以让您运行代码来响应事件,并且以所使用的计算资源付费。Lambda 使用函数代码来响应事件,可以在 Lambda 控制台中创建函数,也可以从您的本地计算机上传函数代码,它可以处理 HTTP 请求,处理 S3 中上传的文件,并且可以与许多其他服务集成。
Amazon S3
Amazon Simple Storage Service(S3) 是一种对象存储服务,因为它可以存储和检索无限数量的数据。此外,Amazon S3 支持极其大的数据对象和高扩展性,它可以存储对象并通过一个唯一的键进行检索,即所谓的对象键。
Amazon DynamoDB
Amazon DynamoDB 是一种完全托管的、多区域、高可靠性的 NoSQL 数据库服务,以保证任何规模和任何负载下的应用程序的性能和可伸缩性。可以存储和检索任意数量的数据,并能以低延迟读取指定的任何项。
采集与清洗过程
下面将介绍 Serverless 的数据采集与清洗过程,主要分为以下几步:
- 在 S3 中创建一个存储桶,用于存储原始数据源。上传原始数据的数据源文件。
- 创建一个 Lambda 函数,该函数在发现原始数据文件时会自动触发。现在,我们需要使用 Lambda 函数阅读该文件并将其写入 DynamoDB。
- 创建 DynamoDB 表,该表将用于存储清理过的数据。
- 创建一个 Lambda 函数,该函数负责解析 DynamoDB 中的数据,进行清理和转换,并将其写入新的 DynamoDB 表中。
步骤 1:在 S3 中创建一个存储桶和上传文件
首先,我们需要在 S3 中创建一个存储桶,并将数据源文件上传到存储桶中。这里需要做的事情如下:
- 登录 AWS 后台,进入 S3 服务,并在左上角选择「创建存储桶」按钮,创建名为
example-bucket
的新存储桶。 - 选择新创建的存储桶,点击「上传」按钮来上传数据源文件到存储桶中。
步骤 2:创建一个 Lambda 函数,用于处理 S3 事件
现在,我们需要创建一个 Lambda 函数,在 S3 中发现新文件时自动触发。这里,Lambda 函数将自动读取 S3 中的文件,并将其写入 DynamoDB 表,目录结构如下所示:
example-bucket/ |__ data.csv |__ cleaned/ |__ data.json
这里是 Lambda 函数的示例代码(将数据从 CSV 文件中读取并将其插入到 DynamoDB 表中):
// javascriptcn.com code example const AWS = require('aws-sdk'); const s3 = new AWS.S3(); const ddb = new AWS.DynamoDB.DocumentClient(); exports.handler = async (event, context) => { const srcBucket = event.Records[0].s3.bucket.name; const srcKey = event.Records[0].s3.object.key; const rawFile = await s3.getObject({Bucket: srcBucket, Key: srcKey}).promise(); // Read CSV const csvData = rawFile.Body.toString('utf8'); const lines = csvData.split('\n'); const headers = lines[0].trim().split(','); for (let i = 1; i < lines.length; i++) { const columns = lines[i].trim().split(','); const item = {}; for (let j = 0; j < headers.length; j++) { item[headers[j]] = columns[j]; } // Insert into DynamoDB await ddb.put({TableName: 'RawData', Item: item}).promise(); } };
步骤 3:创建一个 DynamoDB 表
现在,我们需要创建一个 DynamoDB 表,用于存储清理和转换过的数据。这个表将具有以下属性:
- Partition Key:
id
,用于唯一标识存储在表中的每个数据行。 - Sort Key:
created_at
,存储每个数据行的创建时间。
// javascriptcn.com code example const AWS = require('aws-sdk'); const ddb = new AWS.DynamoDB(); exports.handler = async () => { const tableName = 'CleanedData'; const exists = await ddb.describeTable({TableName: tableName}).promise().catch(() => null); if (!exists) { await ddb.createTable({ TableName: tableName, AttributeDefinitions: [ {AttributeName: 'id', AttributeType: 'S'}, {AttributeName: 'created_at', AttributeType: 'N'} ], KeySchema: [ {AttributeName: 'id', KeyType: 'HASH'}, {AttributeName: 'created_at', KeyType: 'RANGE'} ], ProvisionedThroughput: {ReadCapacityUnits: 1, WriteCapacityUnits: 1} }).promise(); } };
步骤 4:创建一个 Lambda 函数,用于处理和转换数据
现在,我们需要创建另一个 Lambda 函数,该函数将从上一个步骤中创建的 DynamoDB 表中读取数据,并将其清理和转换。这里以转换为 JSON 为例,示例代码如下:
// javascriptcn.com code example const AWS = require('aws-sdk'); const ddb = new AWS.DynamoDB.DocumentClient(); exports.handler = async () => { const scan = await ddb.scan({TableName: 'RawData'}).promise(); for (const item of scan.Items) { item.created_at = Date.now(); // Do some data cleaning work const cleanedItem = {id: item.id, data: JSON.stringify(item)}; await ddb.put({ TableName: 'CleanedData', Item: cleanedItem }).promise().catch(() => null); } };
步骤 5:运行 Lambda 函数
现在,我们一切就绪,可以通过 AWS 控制台调用 Lambda 函数来运行这个 Serverless 的应用程序了。
在 AWS Lambda 控制台,选择左侧菜单中的「函数」,找到你创建的两个函数,分别为 data-collector-from-s3
和 data-cleaner-to-dynamo
。然后,你可以测试你的函数,或者使用 CloudWatch 触发器定期触发这些函数。
结论
Serverless 架构的应用程序开发是一个非常值得尝试的方法,它减少了成本和开发时间,让开发者可以更专注于应用程序本身的开发。本文所介绍的 Serverless 数据采集与清洗系统只是一个示范,你可以根据具体的场景和需求进行修改和扩展。
在 Serverless 的架构下,应用程序在运行时无需担心底层的物理基础设施和网络环境,让开发者可以更加关注业务逻辑。这是一个非常好的选择,如果你还没有尝试过 Serverless 架构,可以考虑一下。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/6737f56c317fbffedf0d612c