如何使用 Serverless 框架中的 Step Functions 进行异步任务处理和流程控制

引言

随着云计算的普及,Serverless 框架已经成为了当前流行的开发方式之一。Serverless 框架中的 AWS Lambda 函数可以快速响应请求,执行针对不同事件的逻辑。然而,在实际开发过程中,我们也会遇到一些异步任务和需要流程控制的场景,这时候 AWS Step Functions 架构就可以很好的解决这些问题。本文将向大家介绍如何使用 Serverless 框架中的 Step Functions 进行异步任务处理和流程控制。

什么是 AWS Step Functions

AWS Step Functions 是一个为构建面向事件驱动的应用程序而优化的服务器端less工作流服务 。AWS Step Functions 可以结合 AWS Lambda、Amazon SNS 等多种 AWS 服务使用,从而使得开发者可以轻松地创建、部署并运行工作流。特别是对于使用了多个服务的应用来说,AWS Step Functions 可以使得这些服务间操作的协调变得更加快捷简单。

AWS Step Functions 的优势

使用 AWS Step Functions 的最大优势在于,能够将云应用的过程抽象成一系列状态机,方便处理和管理,同时也大大降低了应用的复杂度。具体来说,AWS Step Functions 可以让应用的执行流程变得更加清晰、更易于管理,包括:

  1. 事件驱动:AWS Step Functions 采用事件驱动 (Event-Driven),可以自动处理和分配请求。
  2. 时序控制:AWS Step Functions 可以根据时间规则和状态切换条件,控制各个状态间的转换。
  3. 图形化编辑:AWS Step Functions 具有用户友好界面,支持图形化配置状态树、数据流、事件处理、异常处理等。
  4. 明确目的:AWS Step Functions 可以将异步任务,如计算、API 调用、数据处理等操作封装成组件并以后可重用。
  5. 弹性伸缩:AWS Step Functions 与 AWS Lambda 结合使用,可以实现动态自适应性能。

如何使用 AWS Step Functions

使用 AWS Step Functions 的整个流程可以大致分为以下几个步骤:

  1. 定义并创建状态机:状态机由一组状态和状态转换组成,定义状态机即根据需要创建这些状态和转换。
  2. 触发状态机:触发某个状态机则相当于调用一段可重用的程序,提供一些参数和数据输入。
  3. 状态机执行:根据前面步骤中定义的状态机,AWS 执行相应的状态转化,把输入的参数数据传递给对应的环节处理
  4. 处理异步调用:使用 AWS Lambda 资源异步完成调用的任务。
  5. 处理结果输出:让状态机上的 Lambda 返回结果、或者集成其他资源,并将输出结果保存到 S3、DynamoDB 等。

下面,我们将具体介绍如何应用 AWS Step Functions 构造一个电话预约的示例。

构造电话预约系统的示例

在本示例中,我们将使用 AWS Lambda 和 AWS Step Functions 实现一个电话预约的示例。具体内容包括:

  1. 记录用户提供的电话预约信息
  2. 调用第三方短信服务发送信息给接收方
  3. 等待接收方的回复
  4. 根据回复的结果判断是否预约成功

定义状态机

在定义状态机的过程中,我们将根据具体需要安排四个状态:

  1. bookAppointment:记录用户提供的电话预约信息。
  2. sendAppointmentSMS:调用第三方短信服务,并发送预定信息。
  3. waitAppointmentReply:等待接收方的回复信息,一段时间后判断回复状况。
  4. end:根据判断的结果输出预约结果并结束。

定义状态图如下图所示:

其中,waitAppointmentReply 状态会在 Lambda 中调用延时函数,等待接收方的回复。

编写 Lambda 函数

在具体实现过程中,我们需要编写以下 4 个 Lambda 函数:

  1. bookAppointment:记录用户提供的电话预约信息,并将信息传递给 sendAppointmentSMS。
  2. sendAppointmentSMS:调用第三方短信服务,并发送预约信息。
  3. waitAppointmentReply:调用延时函数,并等待接收方的回复。
  4. end:根据回复的结果输出预约结果并结束。

我们将在以下代码中详细介绍这些函数的实现过程。

bookAppointment:记录用户提供的电话预约信息

const AWS = require("aws-sdk");
const dynamo = new AWS.DynamoDB.DocumentClient();

exports.handler = async (event) => {
  let params = {
    TableName: "appointments",
    Item: {
      appointmentId: event.appointmentId,
      appointmentDate: event.appointmentDate,
      appointmentTime: event.appointmentTime,
      appointmentPhoneNumber: event.appointmentPhoneNumber
    }
  };

  try {
    await dynamo.put(params).promise();
    return { success: true };
  } catch (err) {
    console.log(err);
    return { success: false, error: err };
  }
};

bookAppointment 函数的作用是将用户提供的预约信息存储到 dynamoDB 数据库中。

sendAppointmentSMS:调用第三方短信服务

const AWS = require("aws-sdk");
const sms = new AWS.SNS({ region: "us-east-1" });

exports.handler = async (event) => {
  let params = {
    Message: `您已成功预约 ${event.appointmentDate} ${event.appointmentTime},我们将联系您确认.`,
    PhoneNumber: event.appointmentPhoneNumber
  };

  try {
    await sms.publish(params).promise();
    return { success: true };
  } catch (err) {
    console.log(err);
    return { success: false, error: err };
  }
};

sendAppointmentSMS 函数的作用是调用第三方短信服务,向接收方发送预定信息。

waitAppointmentReply:等待接收方的回复

const AWS = require("aws-sdk");
const stepFunctions = new AWS.StepFunctions();

exports.handler = async (event) => {
  let waitTime = event.waitTime || 300;
  let stateMachineArn = process.env.STATE_MACHINE_ARN;

  let params = {
    stateMachineArn: stateMachineArn,
    input: JSON.stringify(event),
    name: event.appointmentId
  };

  await wait(waitTime);

  return await stepFunctions.sendTaskSuccess(params).promise();
};

function wait(timeout) {
  return new Promise(resolve => setTimeout(resolve, timeout * 1000));
}

waitAppointmentReply 函数的作用是调用延时函数,等待接收方的回复。

end:根据回复的结果输出预约结果并结束

const AWS = require("aws-sdk");
const dynamo = new AWS.DynamoDB.DocumentClient();

exports.handler = async (event) => {
  let params = {
    TableName: "appointments",
    Key: {
      appointmentId: event.appointmentId
    }
  };

  let appointmentData = await dynamo.get(params).promise();

  if (appointmentData.Item.appointmentConfirmed) {
    return { success: true };
  } else {
    return { success: false };
  }
};

end 函数的作用是输出预约结果,并结束流程。

定义 Step Functions 状态机

可以使用 AWS 管理控制台、AWS CLI 或者 CloudFormation 进行定义和创建状态机。这里我们选用 AWS 管理控制台进行示例。

登录 AWS 管理控制台

在 AWS 管理控制台中输入关键字 “Step Functions”,单击进入进行服务的配置。

定义 Step Functions 状态机

在 新建状态机 中选择 Step Functions 模板,您将看到两个选项:

  1. 可视化:按照自定义流程设定的状态树,可以快速地组装应用程序。
  2. 代码:用 AWS stepfunctions 架构语音对状态树的模型代码定义。

本示例中,我们选择 可视化 模板,按照之前的状态图添加对应的状态和转换,选择对应的 Lambda 函数和参数进行配置。

在此步骤中,我们为时等待状态 waitAppointmentReply 配置了一个“时钟时间触发”。该延时值表示等待接收方回复的时间,可以根据实际场景进行修改。

运行 Step Functions 状态机

要在 Lambda 控制台中运行 Step Functions 状态机,可以通过运行代码本身、AWS 管理控制台或 AWS SDK 来触发状态机。在本示例中,我们将直接使用 AWS 管理控制台进行操作。

在控制台中,我们需要设置输入参数,单击启动即可运行状态机。在运行状态机的初始流程中,将会出现四个状态节点。

单击每个状态节点,可以查看其执行结果和相关日志信息,对出错的状态进行进一步排查。

总结

AWS Step Functions 可以帮助我们更好地处理和管理异步任务和流程控制。在本文中,我们介绍了如何使用 AWS Step Functions 结合 AWS Lambda 进行电话预约流程的示例。状态机可以方便地管理和控制整个流程,同时使得代码更易于维护和扩展。希望通过这篇文章的介绍,读者们可以更好地理解 AWS Step Functions 的工作方式,积累一些开发经验,在实际应用中发挥更大的价值。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/65bb265dadd4f0e0ff3c53bf