本文将介绍 Serverless CQRS(命令查询职责分离)体系结构,并探讨如何在 Serverless 环境中实现它。Serverless CQRS 是一种以分离职责为基础的软件架构模式,可用于开发分布式系统,其优势在于它可以在不增加部署和管理复杂性的情况下,使应用程序更加可维护和可扩展。
CQRS 基础
在 CQRS 中,应用程序被划分为两个独立的部分。第一部分处理写操作(命令),第二部分负责读操作(查询)。这种分离使应用程序更加灵活,因为可以使用不同的技术和存储选项来处理不同的职责。
例如,对于写操作,我们可能使用一个带有事务支持的关系型数据库。同时,对于查询操作,我们可能使用一个简单的 NoSQL 数据库或者缓存。
此外,CQRS 还提供了一个事件驱动模型,使得我们可以轻松地将其与 Event Sourcing、消息传递和事件总线系统相结合,以便支持可扩展的系统和领域驱动设计模式。
Serverless CQRS
传统的 CQRS 实现需要运行自己的服务器,管理其托管的服务、数据存储和云基础设施等。Serverless 技术可以使 CQRS 更简单,因为它可以减少管理和维护基础设施的成本。同时,Serverless 架构可以根据实际使用量自动缩放,因此可以支持大量的并发操作。
下面我们将探讨如何在 Serverless 环境中实现 CQRS 模式。我们将采用以下技术:
- AWS Lambda:用于处理命令和查询,以及事件驱动模型的事件处理程序。
- AWS API Gateway:用于公开 RESTful API。
- AWS S3:用于持久化事件和命令。
- AWS DynamoDB: 用于存储和检索读模型。
下图演示了 Serverless CQRS 架构。
+--------------+ +----------------+ | API | +-----------------+ | | +- | Gateway | -+ | | | Client | | (Command) | | AWS Lambda | | | +--------------+ | (Command) | +----------------+ +-----------------+ ^ | | | | +------------------+ | +------- | AWS Event Bridge | <------------+ +------------------+ | +-----------------------------------+ +-----------------+ | | | | | | - | AWS Lambda | | | | (Query) | | +----------------+ | +-----------------+ | | AWS S3 | | | +----- | (Event Store) | <------------+ | | +----------------+ | | | | | | | | | | +------------+ | | | | |---------------| AWS Lambda | - | - | AWS DynamoDB | | | (Projection)| | | (Read Model)| | +------------+ | | | | | | | | | | | +-----------------------------------+ +-----------------+
Serverless CQRS 体系结构包括以下组件:
- API Gateway:负责公开 RESTful API,客户端可以使用它发送命令。
- Command Lambda:负责处理命令并将其写入事件存储中。
- Event Store:负责存储事件,它是代码逻辑和设计思想的唯一来源。
- Projection Lambda:负责使用查询模型将事件转换为查询,并将其存储在 DynamoDB 中。
- Query Lambda:负责读取查询模型并返回响应。
实现 Serverless CQRS
准备工作
在开始实现之前,我们需要创建一个 AWS 账户和一个 IAM 管理用户,这将为我们提供 AWS CLI 访问和管理资源的权限。我们还需要安装 AWS CLI。
创建 AWS S3 存储桶
我们需要为事件存储创建 S3 存储桶。请运行以下 AWS CLI 命令:
aws s3api create-bucket --bucket my-bucket --region us-east-1
其中,my-bucket 是存储桶名称,us-east-1 是存储桶所在的 AWS 区域。
部署 Command Lambda
Command Lambda 用于处理客户端发送的命令并将其写入事件存储中。我们可以使用 Node.js 编写 Lambda 函数。请使用以下代码创建一个 Node.js 项目:
mkdir command-lambda cd command-lambda/ npm init -y
接下来,安装 AWS SDK 和 AWS Lambda SDK:
npm install aws-sdk npm install aws-lambda
创建 index.js 文件并添加以下代码:
const AWS = require('aws-sdk'); const s3 = new AWS.S3(); exports.handler = async (event) => { const record = event.Records[0]; const body = JSON.parse(record.body); // Persist event to S3 const params = { Bucket: process.env.EVENT_BUCKET, Key: `${body.type}/${record.messageId}.json`, Body: record.body, }; await s3.putObject(params).promise(); // Publish event to EventBridge const eventBridge = new AWS.EventBridge(); const event = { Source: process.env.EVENT_SOURCE, DetailType: body.type, Detail: record.body, }; await eventBridge.putEvents({ Entries: [event] }).promise(); return { statusCode: 200 }; };
此代码使用 AWS SDK 和 AWS Lambda SDK 访问 S3 和 EventBridge。
该代码是将客户端发送的命令写入事件存储,并将其发布到 EventBridge 的完整实现。
使用 AWS CLI 部署 Lambda 函数,运行以下命令:
aws lambda create-function --function-name command-lambda --runtime nodejs14.x --zip-file fileb://deploy.zip --handler index.handler --role <your-role-arn> --environment Variables="{EVENT_BUCKET=my-bucket,EVENT_SOURCE=my-app}" --memory-size 128 --timeout 5
其中:
--function-name
:Lambda 函数名称。--runtime
:Lambda 函数运行时。--zip-file
:将要部署的代码包。--handler
:Lambda 函数要处理的事件,例如 index.handler。--role
:IAM 角色 ARN,用于授权 Lambda 函数访问其他 AWS 资源。--environment
:指定 Lambda 函数的环境变量。--memory-size
:Lambda 函数的内存大小。--timeout
:Lambda 函数的超时时间。
以上命令将构建和部署到 Lambda,作为一个函数,它的名称是 command-lambda。
部署 Query Lambda
Query Lambda 用于读取查询模型并返回响应。我们可以使用 Node.js 和 AWS SDK 编写 Lambda 函数。请使用以下代码创建一个 Node.js 项目:
mkdir query-lambda cd query-lambda/ npm init -y
安装 AWS SDK 和 AWS Lambda SDK:
npm install aws-sdk npm install aws-lambda
创建 index.js 文件并添加以下代码:
const AWS = require('aws-sdk'); const docClient = new AWS.DynamoDB.DocumentClient(); exports.handler = async (event) => { // Read query model from DynamoDB const params = { TableName: process.env.TABLE_NAME, Key: { id: event.pathParameters.id, }, }; const result = await docClient.get(params).promise(); if (!result.Item) { return { statusCode: 404 }; } const response = { statusCode: 200, headers: { 'Content-Type': 'application/json', }, body: JSON.stringify(result.Item), }; return response; };
使用 AWS CLI 部署 Lambda 函数,运行以下命令:
aws lambda create-function --function-name query-lambda --runtime nodejs14.x --zip-file fileb://deploy.zip --handler index.handler --role <your-role-arn> --environment Variables="{TABLE_NAME=my-table}" --memory-size 128 --timeout 5
部署 Projection Lambda
Projection Lambda 会将事件转换为查询,并将其存储在 DynamoDB 中。我们可以使用 Node.js 和 AWS SDK 编写 Lambda 函数来实现它。请使用以下代码创建一个 Node.js 项目:
mkdir projection-lambda cd projection-lambda/ npm init -y
安装 AWS SDK 和 AWS Lambda SDK:
npm install aws-sdk npm install aws-lambda
创建 index.js 文件并添加以下代码:
const AWS = require('aws-sdk'); const docClient = new AWS.DynamoDB.DocumentClient(); exports.handler = async (event) => { const record = event.Records[0]; const body = JSON.parse(record.body); // Project the event to read model let readModel = {}; if (body.type === 'OrderCreated') { readModel = { 'orderId': body.orderId, 'customer': body.customer, 'createdAt': body.createdAt, }; } else if (body.type === 'OrderUpdated') { const params = { TableName: process.env.TABLE_NAME, Key: { id: body.orderId, }, }; const result = await docClient.get(params).promise(); if (!result.Item) { throw new Error(`Order not found with id ${body.orderId}`); } readModel = { 'orderId': result.Item.orderId, 'customer': result.Item.customer, 'updatedAt': body.updatedAt, }; } else if (body.type === 'OrderDeleted') { const params = { TableName: process.env.TABLE_NAME, Key: { id: body.orderId, }, }; await docClient.delete(params).promise(); return; } else { throw new Error(`Unsupported event type ${body.type}`); } // Write to DynamoDB const params = { TableName: process.env.TABLE_NAME, Item: readModel, }; await docClient.put(params).promise(); };
使用 AWS CLI 部署 Lambda 函数,运行以下命令:
aws lambda create-function --function-name projection-lambda --runtime nodejs14.x --zip-file fileb://deploy.zip --handler index.handler --role <your-role-arn> --environment Variables="{TABLE_NAME=my-table}" --memory-size 128 --timeout 5
部署事件桥
我们需要配置事件桥以将事件从 Command Lambda 传递到 Projection Lambda 和 S3 存储。
请运行以下 AWS CLI 命令:
aws events put-rule --name my-event-pattern --event-pattern "{\"source\":[\"my-app\"],\"detail-type\":[\"OrderCreated\",\"OrderUpdated\",\"OrderDeleted\"]}" aws lambda add-permission --function-name projection-lambda --statement-id ev-r-12345 --action lambda:InvokeFunction --principal events.amazonaws.com --source-arn <arn-of-my-event-pattern> aws lambda add-permission --function-name command-lambda --statement-id ev-r-12346 --action lambda:InvokeFunction --principal events.amazonaws.com --source-arn <arn-of-my-event-pattern> aws events put-targets --rule my-event-pattern --targets "Id"="1","Arn"="<arn-of-projection-lambda>" aws events put-targets --rule my-event-pattern --targets "Id"="2","Arn"="arn:aws:s3:::my-bucket" --input "{\"eventsBus\":\"default\",\"s3Bucket\":\"my-bucket\",\"s3Prefix\":\"events\"}"
其中,my-event-pattern 是事件桥所使用的规则名称,my-app 是事件源名称。
以上步骤将为我们创建事件规则、Lambda 函数授权和 S3 存储。
部署 API Gateway
请运行以下 AWS CLI 命令:
aws apigateway create-rest-api --name my-api aws apigateway create-resource --rest-api-id <your-api-id> --parent-id <parent-resource-id> --path-part orders aws apigateway put-method --rest-api-id <your-api-id> --resource-id <your-resource-id> --http-method POST --authorization-type NONE aws apigateway put-integration --rest-api-id <your-api-id> --resource-id <your-resource-id> --http-method POST --type AWS_PROXY --integration-http-method POST --uri arn:aws:apigateway:<your-region>:lambda:path/2015-03-31/functions/<your-lambda-function-arn>/invocations aws apigateway put-method-response --rest-api-id <your-api-id> --resource-id <your-resource-id> --http-method POST --status-code 200 --response-models "{\"application/json\": \"Empty\"}" aws apigateway create-deployment --rest-api-id <your-api-id> --stage-name dev
此代码将创建一个 RESTful API 和一个 orders 资源。此外,它将创建一个 POST 方法,该方法可将请求发送到 Command Lambda。最后,我们将创建一个部署,以便我们可以测试 API。
测试 API
现在可以测试 API。我们将向 API 发送命令。
请运行以下 AWS CLI 命令:
curl -X POST -H "Content-Type: application/json" -d '{"type":"OrderCreated","orderId":"1","customer":"Alice","createdAt":"2022-01-01T00:00:00Z"}' <your-api-gateway-endpoint>/orders
此命令将创建一个新的订单并将其发布到 S3 和 EventBridge。
查询
我们可以使用 Query Lambda 从 DynamoDB 中读取查询模型。
请运行以下 AWS CLI 命令:
curl -X GET <your-api-gateway-endpoint>/orders/1
该命令将返回查询模型的 JSON 表示形式。
总结
在本文中,我们学习了在 Serverless 环境中实现 CQRS 模式的基本原理和实现方法。我们使用 AWS Lambda、API Gateway、S3 和 DynamoDB 等工具来构建 Serverless CQRS 体系结构。
Serverless CQRS 体系结构使得系统具有更高的可维护性和可扩展性,并节省了管理基础设施的时间和资源。
来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/65913d9ceb4cecbf2d674891