在 GraphQL 中使用 Redis Pub / Sub 实现实时数据更新的方法

前言

GraphQL 是一种用于 API 的查询语言和运行时环境,可以让前端开发者更加高效、灵活地查询和获取数据。与传统 RESTful API 不同的是,GraphQL 有独特的 Schema 和 Resolver 组织方式,可以很方便地做到数据的聚合和批量查询。

然而,GraphQL 却不支持类似于 WebSocket 的实时数据传递。在某些场景下,如在线多人游戏、在线聊天等,需要实时更新数据。那么,本文将介绍使用 Redis Pub / Sub(发布订阅)实现 GraphQL 实时数据更新的方法。

Redis Pub / Sub 简介

Redis 是一个基于内存的高性能键值数据库。其中,Redis Pub / Sub 是 Redis 的一种非常有用的功能,它提供了发布订阅模式的消息传递。

在 Redis Pub / Sub 中,发布者不会直接把消息发送给订阅者,而是将消息发送给一个中间层 - 通道。而订阅者则订阅这些通道,当有消息发布时,所有订阅该消息的订阅者都能够接收到。Redis Pub / Sub 就这样通过通道实现了消息的分发。

Redis Pub / Sub 在 GraphQL 中的应用

Redis Pub / Sub 可以用于提供实时数据更新服务。GraphQL 中可以通过订阅查询来达到这一目的。

  1. 首先,我们需要在 Redis 中创建一个消息通道:
const Redis = require('ioredis');
const redis = new Redis();

const pubsub = new Redis();

const MESSAGE_CHANNEL = 'messages';

// 创建消息通道
pubsub.subscribe(MESSAGE_CHANNEL, (err, count) => {
  if (err) {
    throw new Error('Redis: could not subscribe to channel');
  }
  console.log(`Redis: subscribed to ${count} channel.`);
});
  1. 接下来,我们需要在 GraphQL 的 Schema 中定义相应的 Subscription 类型。该类型以与 Query 和 Mutation 类似的方式定义,从订阅频道接收数据,然后将其作为 GraphQL 类型返回:
const typeDefs = `
  type ChatMessage {
    id: ID!,
    text: String!,
    sender: String!
  }

  type Subscription {
    messageUpdate: ChatMessage!
  }
`;
  1. 然后,在 GraphQL Resolver 中订阅 Redis 消息通道,并返回一个 AsyncIterator,该 AsyncIterator 将无限期地等待来自 Redis 消息通道的消息:
const resolvers = {
  Subscription: {
    messageUpdate: {
      subscribe: () => pubsub.asyncIterator(MESSAGE_CHANNEL),
    },
  },
};
  1. 最后,在 GraphQL 中触发订阅查询。这可以使用任何 GraphQL 客户端库来实现,例如 Apollo Client。
const MESSAGE_SUBSCRIPTION = gql`
  subscription {
    messageUpdate {
      id
      text
      sender
    }
  }
`;

// 触发订阅查询
const subscriptionClient = new SubscriptionClient('ws://localhost:3000', {
  reconnect: true,
});

const client = new ApolloClient({
  link: new WebSocketLink(subscriptionClient),
  cache: new InMemoryCache(),
});

const { data } = await client.subscribe({ query: MESSAGE_SUBSCRIPTION });

示例代码

下面是一个完整的示例代码,用于演示 Redis Pub / Sub 如何在 GraphQL 中实现实时数据更新。

const Redis = require('ioredis');
const express = require('express');
const { ApolloServer, gql } = require('apollo-server-express');
const { createServer } = require('http');
const { execute, subscribe } = require('graphql');
const { SubscriptionServer } = require('subscriptions-transport-ws');


const pubsub = new Redis();
const MESSAGE_CHANNEL = 'messages';

// 创建消息通道
pubsub.subscribe(MESSAGE_CHANNEL, (err, count) => {
  if (err) {
    throw new Error('Redis: could not subscribe to channel');
  }
  console.log(`Redis: subscribed to ${count} channel.`);
});

// 定义 GraphQL Schema
const typeDefs = gql`
  type ChatMessage {
    id: ID!,
    text: String!,
    sender: String!
  }

  type Query {
    messages: [ChatMessage]
  }

  type Mutation {
    sendMessage(text: String!, sender: String!): ChatMessage!
  }

  type Subscription {
    messageUpdate: ChatMessage!
  }
`;

// 定义 GraphQL Resolver
const resolvers = {
  Query: {
    messages: () => [],
  },
  Mutation: {
    sendMessage: async (_, { text, sender }) => {
      // 发送消息到 Redis 消息通道
      const id = Math.random().toString(36).slice(2);
      await pubsub.publish(MESSAGE_CHANNEL, JSON.stringify({ id, text, sender }));

      return { id, text, sender };
    },
  },
  Subscription: {
    messageUpdate: {
      subscribe: () => pubsub.asyncIterator(MESSAGE_CHANNEL),
    },
  },
};

// 创建一个 Apollo Server
const server = new ApolloServer({
  typeDefs,
  resolvers,
  subscriptions: {
    path: '/subscriptions',
    onConnect: () => console.log('Client subscribed'),
    onDisconnect: () => console.log('Client disconnected'),
  },
});

const app = express();
server.applyMiddleware({ app });

const httpServer = createServer(app);

httpServer.listen({ port: 3000 }, () => {
  console.log(`🚀 Server ready at http://localhost:3000${server.graphqlPath}`);
  console.log(`🚀 Subscriptions ready at ws://localhost:3000${server.subscriptionsPath}`);
});

// 创建一个 Subscription Server
const subscriptionServer = SubscriptionServer.create(
  {
    schema: server.schema,
    execute,
    subscribe,
    onConnect: () => console.log('Client connected'),
    onDisconnect: () => console.log('Client disconnected'),
  },
  {
    server: httpServer,
    path: server.subscriptionsPath,
  }
);

在上面的代码中,我们创建了一个包含订阅查询的 GraphQL Schema,并实现了一个 Mutation,用于向消息通道发布消息。

在 GraphQL Resolver 中,我们使用 Redis Pub / Sub 订阅消息通道,并将订阅查询的结果作为 AsyncIterator 返回。

最后,我们通过一个 HTTP 服务器和一个 WebSocket 服务器来启动 Apollo Server 和 Subscription Server。

总结

本文介绍了在 GraphQL 中使用 Redis Pub / Sub 实现实时数据更新的方法。我们首先了解了 Redis Pub / Sub 的工作原理,然后展示了如何使用 Redis Pub / Sub 在 GraphQL 中实现实时数据更新。

通过使用 Redis Pub / Sub 和 GraphQL,可以轻松地实现实时数据更新,从而满足更多场景的需求。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/65b54881add4f0e0ffe12232