前言
在实际开发中,数据通常不仅仅存在于一个地方,而且大多数应用的数据存储通常是重要的资产。而关系型数据库在数据存储方面被广泛使用,但是与此同时,NoSQL 数据库的流行也日益增长。除此之外,一些内存数据库 (如 Redis)也被广泛使用,以实现高性能读写操作。
在现代应用中,事务性数据的一致性通常是非常重要的。这意味着当一个事务发生更改时,应用中所有的其他部分也应该看到它。然而,在多个数据存储之间保持一致性是不容易的。
在这篇文章中,我们将讨论 Redis 与数据库之间的同步机制,以及如何保持多个数据源之间的一致性。我们将展示 Redis 中内建的同步功能,这有助于维护多个数据存储之间的一致性。我们还将进行讨论,以便根据不同的用例选择适当的同步策略。
Redis 的同步机制
Redis 是一种内存数据库,它提供了高性能的读写操作。Redis 可以作为主数据库或从数据库使用,也可以与其他数据库一起使用。多数使用场景下,Redis 用于缓存或作为主数据库用于读写操作。而它可以与关系数据库一起使用,以实现持久化,也可将 Redis 与同样支持内存缓存的其他数据存储一起使用。
Redis 有建立在“发布订阅” 机制上的原生“ Replication”(复制 / 同步)功能。这允许 Redis 主节点将更改发送到一个或多个从节点。从节点在接收到这些更改后会自动将其与本地数据同步。这是 Redis 提供的一种强大的数据复制功能,很容易在 Redis 环境中实现高可用性和高容错性。这个功能在 Redis 中是默认开启的,需要注意的是,由于主节点的写入操作必须被从节点接受之后才能算是成功,因此从节点与主节点之间需要及时的同步操作。
启用 Redis 复制之后,从节点会按照主节点的日志文件来同步主节点的写入指令,完成数据的一致性复制。如果从节点丢失了日志文件的任何一部分,则从节点将无法再次恢复。此外,如果存在网络分区(Network partition),则一些从节点可能无法与主节点通信并停止复制。这就是为什么需要对从节点进行监视,并启用自动故障迁移(Automatic Failover)以确保高可用性的原因。
Redis 缓存和数据库同步
如上所述,Redis 通常用作缓存,以减少读取高效数据的时间并减轻主数据库的压力。因为 Redis 提供了快速的读取操作,将数据缓存到 Redis 中可以最大程度地减少数据库的 I/O 操作,从而提高应用程序的性能。
当 Redis 缓存被用于读取操作时,可以放心大胆地使用它而不必担心同步问题。因为大多数情况下,Redis 缓存仅仅是读操作而已。通常,这样的缓存操作不需要与主数据库同步。
然而,当 Redis 用于读写数据时,就必须进行同步操作。因为 Redis 数据是保存在内存中的,如果不及时保存到持久化存储介质(如硬盘)中,一旦机器发生宕机等情况导致数据清空,那么 Redis 上保存的数据也会被一同清空,这时候如果数据库没有进行保存,那么数据就会丢失了。因此,我们需要确保 Redis 中的数据通过适当的同步机制与数据库中的数据实时同步。
同步 Redis 缓存和数据库的数据主要有两种方式:
1. 直接同步
这种同步方式不仅可以保证数据的一致性,还可以将 Redis 与数据库中的数据完全同步。在这种方式下,每次添加、删除或更新 Redis 中的数据时,都会与数据库进行同步。
下面是一个 Python 示例,它演示了如何在 Redis 中使用 Pandas 数据框(DataFrame) 并将其与 MySQL 数据库中的数据同步。此示例将使用 Redis 作为主数据库,MySQL 作为从数据库。当在 Redis 中添加,更新或删除了一行数据时,这个变化将立即被同步到 MySQL 数据库中
// javascriptcn.com 代码示例 import pandas as pd import pymysql.cursors import redis redis_host = "localhost" redis_port = 6379 redis_password = "" mysql_host = "localhost" mysql_user = "user" mysql_password = "password" mysql_db = "test" red = redis.Redis(host=redis_host, port=redis_port, password=redis_password) mysql = pymysql.connect(host=mysql_host,user=mysql_user, password=mysql_password,db=mysql_db, cursorclass=pymysql.cursors.DictCursor) def insert_data_to_redis(df, redis_key): redis_df = df.to_msgpack() red.set(redis_key, redis_df) def get_data_from_mysql(): sql = "SELECT * from customers" df = pd.read_sql(sql, mysql) return df def sync_redis_mysql(table_name): df = get_data_from_mysql() insert_data_to_redis(df, table_name)
2. 延迟同步
这种同步方式不会立即将 Redis 中的更改传递给数据库,相反,延迟同步将更改汇总到队列中,以便稍后进行批处理。这种方法比直接同步更加有效,因为它减少了数据库的压力,同时也保证了数据一致性。
下面是一个 Python 示例,它演示了如何在 Redis 中使用 Pandas 数据框并将其与 MySQL 数据库中的数据进行延迟同步。此示例将使用 Redis 作为主数据库,MySQL 作为从数据库,并且将维护一个“更新”队列和一个“删除”队列。每当 Redis 中的数据被更新或删除时,将会将其添加到队列中。
// javascriptcn.com 代码示例 import pandas as pd import pymysql.cursors import redis redis_host = "localhost" redis_port = 6379 redis_password = "" mysql_host = "localhost" mysql_user = "user" mysql_password = "password" mysql_db = "test" red = redis.Redis(host=redis_host, port=redis_port, password=redis_password) redis_q_update = "update_queue" redis_q_delete = "delete_queue" mysql = pymysql.connect(host=mysql_host,user=mysql_user, password=mysql_password,db=mysql_db, cursorclass=pymysql.cursors.DictCursor) def insert_data_to_redis(df, redis_key): redis_df = df.to_msgpack() red.set(redis_key, redis_df) def add_to_update_queue(data, table_name): o = {"table_name": table_name, "data": data} red.rpush(redis_q_update, json.dumps(o)) def add_to_delete_queue(data, table_name): o = {"table_name": table_name, "data": data} red.rpush(redis_q_delete, json.dumps(o)) def update_data_in_redis(df, redis_key, table_name): redis_df = df.to_msgpack() add_to_update_queue(redis_df, table_name) red.set(redis_key, redis_df) def delete_data_in_redis(df, redis_key, table_name): redis_df = df.to_msgpack() add_to_delete_queue(redis_df, table_name) red.set(redis_key, redis_df) def get_data_from_mysql(): sql = "SELECT * from customers" df = pd.read_sql(sql, mysql) return df def sync_redis_mysql(table_name): df = get_data_from_mysql() insert_data_to_redis(df, table_name) def process_redis_update_queue(): while True: msgs = red.blpop(redis_q_update, timeout=0) o = json.loads(msgs[1]) table_name = o["table_name"] data = pd.read_msgpack(o["data"]) insert_data_to_mysql(data, table_name) def process_redis_delete_queue(): while True: msgs = red.blpop(redis_q_delete, timeout=0) o = json.loads(msgs[1]) table_name = o["table_name"] data = pd.read_msgpack(o["data"]) delete_data_from_mysql(data, table_name)
总结
使用 Redis 与关系型数据库进行数据同步是一个很常见的需求,并且只要我们了解了 Redis 内建的同步功能,就可以很容易地实现它。同步 Redis 与数据库的数据有两种方式:一种是直接同步方式,另一种是延迟同步方式。根据不同的用例,我们可以选择不同的同步策略来确保数据的一致性,并最大程度地减少数据存储和读取操作的成本。
在开发过程中,正确地使用同步机制可以保证数据的一致性,避免错误的发生,提高应用程序性能。因此,我们应该认真思考这个问题并作出明智的选择。
来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/653f6d647d4982a6eb8fc684