在互联网时代,数据是企业重要的资产之一,因此数据的迁移是一个非常重要的技术工作。数据迁移需要考虑多种因素,如存储、带宽、安全性、稳定性等。而 Serverless 技术则是一种可以大幅度减少迁移成本的解决方案。
Serverless 简介
Serverless 是一种新兴的云计算模式,它将计算资源的管理交给云平台,并且按使用量计费。开发者只需要提供代码,其余的部署、扩容、高可用都由云平台实现,从而大幅度减少了资源管理和部署的成本。现在的 Serverless 平台包括 AWS Lambda、阿里云函数计算等。
大规模数据迁移实践
我们通过一个生产实践案例,来展示如何基于 Serverless 进行大规模数据迁移。
1. 背景
某公司要将一批数据从一个云存储平台迁移到另一个云存储平台,该数据包含几百万个文件,总大小为 10 TB。迁移期间需要保证数据的完整性,且不会对生产环境造成影响。
2. 设计
我们设计了一个基于 Serverless 的数据迁移方案,其主要部分包括上传器、下载器和完整性验证器。其中上传器和下载器使用 AWS Lambda,完整性验证器使用 AWS SNS 和 SQS。
我们将下载器和完整性验证器部署在源云存储平台所在的区域,上传器和完整性验证器部署在目标云存储平台所在的区域,从而减少了跨区域传输时的网络费用和延迟。此外,由于数据较大,我们使用了分块上传和下载,提高上传和下载速度,并且方便进行数据校验。
2.1 下载器
我们使用 PILA(Parallel Interleaved Line Access)算法来并行下载文件的多个块,通过 AWS Lambda 实现了多线程下载,并且使用 MD5 算法对下载的数据进行校验。如果下载的数据不正确,我们将在 SQS 上发出一个报警。
以下是下载器的示例代码:
// javascriptcn.com 代码示例 import boto3 import requests import hashlib from concurrent.futures import ThreadPoolExecutor s3 = boto3.client('s3') def download_file(url, offset, size): headers={'Range': f'bytes={offset}-{offset + size - 1}'} return requests.get(url, headers=headers).content def download_parts(bucket, key, url, size, chunk_size, workers): futures = [] with ThreadPoolExecutor(workers) as executor: for i in range(0, size, chunk_size): chunk_start = i chunk_end = min(i + chunk_size, size) - 1 future = executor.submit(download_file, url, chunk_start, chunk_size) futures.append(future) results = [f.result() for f in futures] content = b''.join(results) md5 = hashlib.md5(content).hexdigest() s3.put_object(Body=content, Bucket=bucket, Key=key) return md5
2.2 上传器
上传器同样使用 AWS Lambda 实现了分块上传,对上传的数据进行 MD5 校验,并将结果写入 SQS。为了提高上传速度,我们将上传分块的大小调整为 10 MB,同时使用多线程进行分块上传。
以下是上传器的示例代码:
// javascriptcn.com 代码示例 import boto3 import requests import hashlib from concurrent.futures import ThreadPoolExecutor s3 = boto3.client('s3') def upload_part(bucket, key, url, offset, part_size): chunk = s3.get_object(Bucket=bucket, Key=key, Range=f'bytes={offset}-{offset + part_size - 1}')['Body'].read() headers = {'Content-Length': str(len(chunk)), 'Content-MD5': hashlib.md5(chunk).hexdigest()} requests.put(url, headers=headers, data=chunk) def upload_parts(bucket, key, url, size, chunk_size, workers): upload_id = s3.create_multipart_upload(Bucket=bucket, Key=key)['UploadId'] futures = [] with ThreadPoolExecutor(workers) as executor: for i in range(0, size, chunk_size): chunk_start = i chunk_end = min(i + chunk_size, size) - 1 part_size = chunk_end - chunk_start + 1 part_number = (chunk_start // chunk_size) + 1 upload_url = requests.post(url, json={'bucket': bucket, 'key': key, 'part_number': part_number, 'upload_id': upload_id}).json()['url'] future = executor.submit(upload_part, bucket, key, upload_url, chunk_start, part_size) futures.append(future) [f.result() for f in futures] parts = [{'PartNumber': (i // chunk_size) + 1, 'ETag': hashlib.md5(s3.get_object(Bucket=bucket, Key=key, Range=f'bytes={i}-{min(i + chunk_size - 1, size - 1)}')['Body'].read()).hexdigest()} for i in range(0, size, chunk_size)] s3.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) md5 = hashlib.md5(s3.get_object(Bucket=bucket, Key=key)['Body'].read()).hexdigest() return md5
2.3 完整性验证器
完整性验证器使用 S3 的 PUT 操作触发 Lambda 函数,每个 Lambda 函数用于校验一个对象的数据完整性,如果完整性无误,则将结果发送到 SQS 上。
以下是完整性验证器的示例代码:
// javascriptcn.com 代码示例 import boto3 import hashlib s3 = boto3.client('s3') sns = boto3.client('sns') sqs = boto3.client('sqs') def validate_object(bucket, key): md5 = hashlib.md5(s3.get_object(Bucket=bucket, Key=key)['Body'].read()).hexdigest() queue_url = sqs.get_queue_url(QueueName='validation-result')['QueueUrl'] sns.publish(Message=md5, TopicArn='validation-result-topic') sqs.send_message(QueueUrl=queue_url, MessageBody=key) def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] key = event['Records'][0]['s3']['object']['key'] validate_object(bucket, key)
3. 总结
基于 Serverless 实现的数据迁移方案,可以大幅度减少资源管理和部署的成本,同时保证了数据的完整性和稳定性。通过这个实践案例,我们可以提供参考的思路和方案。当然,对于不同的数据迁移场景,需要进行更深入的技术分析和设计调整。
来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/6541ca107d4982a6ebb6845c