如何在 Serverless 中使用 Kinesis 和 Lambda

阅读时长 9 分钟读完

在 Serverless 架构中,AWS Lambda 是一种常见的无服务器计算服务,用于自动扩展应用程序并消除运维成本。与之类似的还有数据流处理服务 Kinesis,它可以支持实时数据注入和处理。在本文中,我们将探讨如何在 Serverless 中使用 Kinesis 和 Lambda。

Kinesis 和 Lambda 的结合

Kinesis 是一种高度可伸缩的流式数据处理服务,可用于读取和写入大型、分散、高吞吐量的数据流。它可以处理和存储来自 Web 和移动应用程序、媒体和 IoT 设备等各种来源的数据,并在实时或稍后进行处理。

Lambda 是一个事件驱动的计算服务,可以自动扩展以处理各种事件,例如 HTTP 请求、S3 对象创建或 DynamoDB 变更等事件。Kinesis 和 Lambda 的结合可以实现一些有趣的用例,例如:

  • Real-time processing: 对来自 Kinesis 数据流的大量数据进行实时处理。
  • ETL pipelines: 从 Kinesis 流中提取数据并将其转换为其他系统可以使用的格式,例如 Elasticsearch 或 S3 存储桶。
  • Stream processing: 执行连续处理,如窗口聚合、流连接和时间序列分析。

Serverless Framework 的部署

我们将使用 Serverless Framework 来部署 Lambda 和 Kinesis 管道。 在开始之前,您需要确保按照 AWS 文档的要求设置您的 AWS 访问凭据。现在,让我们安装和配置 Serverless CLI:

作为第一步,我们需要将 Serverless Framework 插件视为依赖项,以便能够更轻松地部署 Kinesis 管道。

在这个课程中,我们将处理数据流 Kinesis Firehose。Firehose 是一种 Kinesis 服务,它可以从数据源中不间断地获取数据,并将其发送到目标存储区。这些存储区可以是 Amazon S3、Redshift 或 Elasticsearch 等服务。

Lambda 函数

现在,我们将为一个 Serverless Lambda 函数部署一个新服务。这个函数将通过 AWS API Gateway 暴露一个 HTTP 端点,接收推送到 Kinesis Firehose 的数据,并将它们处理为我们所需要的格式。在本例中,使用了 Node.js 运行时和 serverless-http 插件。

使用以下命令来创建 HTTP API Gateway 绑定的 Lambda 函数,并将函数部署到 AWS:

-- -------------------- ---- -------
-------- ----------

---------
  ----- ---
  -------- ----------
  ------- ---------

----------
  ----
    -------- -----------
    -------
      - -----
          ----- -
          ------- ----
          ----- ----

--------
  - ---------------
-- -------------------- ---- -------
----- ------ - ------------------

------------------ - ----- ------- -- -
  ----- - ------------------ ------ - - ---------------------

  --------------------- ------- -----------------------
  -------------------- ------------

  ------------------------------ -- -
    ----- ------- - ------------------------ ----------------------------
    ----- ---- - --------------------
    ----- --------- - --------------------------------------------

    ----- ------ - -
      --- --------
      ----------
      -------- -------------
    --

    -----------------------------------------
  ---

  ------ -
    ----------- ----
    ----- -------------------
  --
--

使用 Kinesis Firehose 管道

首先,让我们通过 CloudFormation 声明一个 Firehose Delivery Stream。

-- -------------------- ---- -------
----------
  -----------------
    ----- --------------------------------------
    -----------
      ------------------- ------------------------------------------
      ------------------- ---------
      ---------------------------
        -------- ------- ----------------
        ---------- --------------------------------------------------------------
        ------- -------
        ---------------
          ------------------ --
          ---------- -
        ------------------ ----
        -------------------------
          -------- ----
          ------------- --------------------------------
          -------------- ----- ----- ----- ----------------- ------------------

这段 CloudFormation 中的代码创建了一个名为 kinesis-firehose-processor-delivery-stream 的 Kinesis Firehose Stream,它会将记录写入到一个名为 kinesis-firehose-processor-destination-bucket 的 Amazon S3 存储桶中。在 S3DestinationConfiguration 属性中,我们为 S3 存储桶配置了前缀、缓冲配置和压缩格式等设置。

然后,我们需要创建一个 AWS Lambda 来作为 Kinesis Firehose 处理程序。打开 serverless.yml 文件并添加以下内容:

这会在我们部署的 AWS 存储桶和 Kinesis Firehose Stream 之间建立一个平台。当有新的数据到达 Kinesis Firehose Stream 时,Lambda 函数将自动触发并将数据转换为我们指定的格式,并将其写入到前面声明的 Amazon S3 存储桶中。

-- -------------------- ---- -------
----- ---- - ----------------

------------------------------- - ----- ------- -------- -- -
  ----- ------- - ---

  --- ------ ------ -- -------------- -
    ----- ------- - ------------------------ ----------
    ----- ------------ - -------------------------
    ----- ---------- - -------------------------------
    ----- ------- - -----------------------

    ----- ----------- - -
      --- -----------
      -------- ----------------
    --

    ----- ------ - -
      --------- ----------------
      ------- -----
      ----- ------------------------------------------------------------
    --

    ---------------------
  -

  ------ -
    -------- --------
  --
--

结论

AWS 还提供了一些其他的 Stream 服务,例如 Kinesis Analytics、Kinesis Video Streams 等等。Serverless 框架可以帮助我们轻松构建这些流处理管道,让我们将注意力更多地放在数据分析和处理上,而不是架构和管理。本文所述的例子只是 AWS Kinesis 和 Lambda 的一种简单组合,还有许多其他有趣的应用程序等着我们去挖掘。

来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/670254a9d91dce0dc8472295

纠错
反馈