RxJS + Koa.js 实现 WebSocket 单页应用全栈实践

WebSocket 是一种基于 TCP 协议的全双工通信协议,在前端开发中被广泛应用于实时通信、实时数据更新等场景。RxJS 是一个响应式编程库,可以将异步数据流以类似于数组的形式进行处理,提供了强大的操作符和工具函数。Koa.js 是一个基于 Node.js 的 Web 框架,提供了简洁的 API 和中间件机制,支持异步流程控制。

本文将介绍如何使用 RxJS 和 Koa.js 实现 WebSocket 单页应用全栈实践,包括 WebSocket 服务器的搭建、前端页面的开发、数据流的处理和数据持久化等方面。本文的示例代码基于 TypeScript 编写,适合有一定 TypeScript 基础和 RxJS 使用经验的读者。

WebSocket 服务器的搭建

在使用 WebSocket 之前,我们需要先搭建一个 WebSocket 服务器。下面是使用 Koa.js 和 WebSocket 库实现的简单 WebSocket 服务器:

import Koa from 'koa';
import WebSocket from 'ws';

const app = new Koa();
const server = app.listen(3000);

const wss = new WebSocket.Server({ server });

wss.on('connection', (ws: WebSocket) => {
  console.log('client connected');

  ws.on('message', (message: string) => {
    console.log(`received message: ${message}`);

    ws.send(`echo: ${message}`);
  });

  ws.on('close', () => {
    console.log('client disconnected');
  });
});

上面的代码中,我们先创建了一个 Koa 应用,并监听了 3000 端口。然后创建了一个 WebSocket 服务器,并将其绑定到 Koa 应用的 HTTP 服务器上。在客户端连接成功后,我们会输出一条日志,并监听客户端发来的消息。当收到消息后,我们会将其原样返回给客户端,并输出一条日志。在客户端断开连接时,我们也会输出一条日志。

前端页面的开发

在前端页面中,我们需要使用 WebSocket API 连接到服务器,并发送和接收消息。下面是一个简单的前端页面的示例代码:

<!DOCTYPE html>
<html>
  <head>
    <meta charset="UTF-8" />
    <title>WebSocket Demo</title>
  </head>
  <body>
    <input type="text" id="input" />
    <button id="send">Send</button>
    <ul id="messages"></ul>
    <script>
      const socket = new WebSocket('ws://localhost:3000');

      socket.addEventListener('open', (event) => {
        console.log('connected');
      });

      socket.addEventListener('message', (event) => {
        const message = event.data;

        const li = document.createElement('li');
        li.textContent = message;

        document.getElementById('messages').appendChild(li);
      });

      document.getElementById('send').addEventListener('click', () => {
        const input = document.getElementById('input');
        const message = input.value;

        socket.send(message);

        input.value = '';
      });
    </script>
  </body>
</html>

上面的代码中,我们先创建了一个 WebSocket 对象,指定了服务器的地址和端口号。在连接成功后,我们会输出一条日志。当收到服务器的消息时,我们会创建一个新的 <li> 元素,并将消息内容插入其中,然后添加到消息列表中。在点击发送按钮时,我们会获取输入框中的文本内容,并通过 WebSocket 发送给服务器。

数据流的处理

在使用 RxJS 处理 WebSocket 数据流时,我们需要先将 WebSocket 对象转换为一个可观察对象。然后使用操作符对数据流进行处理,最后将处理后的数据发送给客户端。下面是一个简单的数据流处理示例:

import WebSocket from 'ws';
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

const wss = new WebSocket.Server({ port: 3000 });

wss.on('connection', (ws: WebSocket) => {
  console.log('client connected');

  const messages$ = fromEvent(ws, 'message').pipe(
    map((event: WebSocket.MessageEvent) => event.data)
  );

  messages$.subscribe((message) => {
    console.log(`received message: ${message}`);

    ws.send(`echo: ${message}`);
  });

  ws.on('close', () => {
    console.log('client disconnected');
  });
});

上面的代码中,我们使用了 RxJS 中的 fromEvent 操作符将 WebSocket 对象转换为一个可观察对象。然后使用 map 操作符对收到的消息进行处理,将其转换为字符串类型。在收到消息后,我们会输出一条日志,并将处理后的消息返回给客户端。在客户端断开连接时,我们也会输出一条日志。

数据持久化

在实际应用中,我们通常需要将收到的消息保存到数据库中,以便后续查询和分析。下面是一个使用 MongoDB 存储消息的示例:

import mongoose from 'mongoose';
import WebSocket from 'ws';
import { fromEvent } from 'rxjs';
import { map, tap } from 'rxjs/operators';

mongoose.connect('mongodb://localhost:27017/websocket-demo', {
  useNewUrlParser: true,
  useUnifiedTopology: true,
});

const messageSchema = new mongoose.Schema({
  content: String,
  createdAt: { type: Date, default: Date.now },
});

const Message = mongoose.model('Message', messageSchema);

const wss = new WebSocket.Server({ port: 3000 });

wss.on('connection', (ws: WebSocket) => {
  console.log('client connected');

  const messages$ = fromEvent(ws, 'message').pipe(
    map((event: WebSocket.MessageEvent) => event.data),
    tap(async (content) => {
      const message = new Message({ content });
      await message.save();
    })
  );

  messages$.subscribe((message) => {
    console.log(`received message: ${message}`);

    ws.send(`echo: ${message}`);
  });

  ws.on('close', () => {
    console.log('client disconnected');
  });
});

上面的代码中,我们使用了 Mongoose 库连接到 MongoDB 数据库,并定义了一个 Message 模型。在收到消息后,我们会将其保存到数据库中,并输出一条日志。在客户端断开连接时,我们也会输出一条日志。

总结

本文介绍了如何使用 RxJS 和 Koa.js 实现 WebSocket 单页应用全栈实践,包括 WebSocket 服务器的搭建、前端页面的开发、数据流的处理和数据持久化等方面。本文的示例代码基于 TypeScript 编写,适合有一定 TypeScript 基础和 RxJS 使用经验的读者。在实际应用中,我们可以根据具体需求进行更加复杂的数据流处理和数据持久化操作,以实现更加灵活和高效的 WebSocket 应用。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/658c46c6eb4cecbf2d1ab2c2


纠错
反馈