August 2, 2021

Cross Region Dynamodb Backups

I’ve been using DynamoDB recently for a few projects and ran into something that I didn’t really expect to be a problem, but there wasn’t an out-of-the-box solution.

Cross Region Snapshots

It seems that while you can do point-in-time recovery and create manual snapshots, there is no way to copy these snapshots to another region.

There does however exist a process to restore a database from a snapshot from another region. Of course this doesn’t help when that region no longer exists (for whatever reason that maybe).

Of course, I find this rather amusing that AWS has a control in some it’s best practices to ensure that databases have backups in another region that are usable as part of your DR/BCP restoration processes.

I came up with the following solution:

  1. Define a source and destination regions
  2. List all the tables in the source
  3. Create a backup of each table in the source region
  4. Use the above backup to create a new table in the destination region
  5. Create a backup in the destination region
  6. Delete the tabel in the destination region
  7. Delete any backups older than a retention period (e.g. 1 day) in both regions

This is likely not very efficient and could be slow if the read/write units isn’t high enough on the destination table, and could be reasonably expensive if you have a lot of data.

I also haven’t got into any issues with GSIs, but since you can create these after the fact it shoudn’t be too much of an issue, I’d guess that LSIs might be an issue, but again, I haven’t looked at that too much.

#!/usr/bin/env python
"""
Create a cross region backup

We do this by:
# Get tables
# Take backup
# Wait for backup
# List backups, sort by recent, get recent id
# Restore backup to other region
# Wait for table to be active
# take backup
# wait for backup to be ready
# delete table
# delete old backups

# build a container, make it run regularly
"""

import os
import time
import datetime
import logging
import multiprocessing
import boto3

logging.basicConfig(level=logging.INFO)

PROCESSES = os.environ.get('PROCESSES', default=4)
SOURCE_REGION = os.environ.get('SOURCE_REGION', default='ap-southeast-2')
DESTINATION_REGION = os.environ.get('DESTINATION_REGION', default='ap-southeast-1')
RETENTION_DAYS = os.environ.get('RETENTION_DAYS', default=1)

ddbaps2 = boto3.client('dynamodb', region_name=SOURCE_REGION)
ddbaps1 = boto3.client('dynamodb', region_name=DESTINATION_REGION)

isonow = datetime.datetime.now().isoformat().replace(':', '_')

def make_backup(table_name, client):
    """Make a backup and wait for it to be available then return the ARN"""

    logging.info("Creating backup for %s", table_name)

    try:
        backup = client.create_backup(
            TableName=table_name,
            BackupName=table_name + isonow
        )

        status = backup['BackupDetails']['BackupStatus']
        logging.info("Backup status for %s is %s", table_name, status)

        while status != 'AVAILABLE':
            time.sleep(30)
            backup_description = client.describe_backup(
                BackupArn=backup['BackupDetails']['BackupArn']
            )
            status = backup_description['BackupDescription']['BackupDetails']['BackupStatus']
            logging.info("Backup status for %s is %s", table_name, status)

        return backup['BackupDetails']['BackupArn']
    except Exception as error: #pylint: disable=broad-except
        logging.error("An error occurred creating the backup: %s", error)
        raise

def cross_region_restore(table_name, backup_arn, client):
    """Restore the table in another region, wait"""

    logging.info("Creating cross region restore for %s from %s", table_name, backup_arn)

    try:
        client.restore_table_from_backup(
            TargetTableName=table_name,
            BackupArn=backup_arn,
            SSESpecificationOverride={
                'Enabled': True,
                'SSEType': 'KMS'
            }
        )

        waiter = client.get_waiter('table_exists')

        waiter.wait(
            TableName=table_name
        )
    except Exception as error: #pylint: disable=broad-except
        logging.error("An error occurred creating the table: %s", error)
        raise

def delete_table(table_name, client):
    """Delete a table"""

    logging.info("Deleting table %s", table_name)

    try:
        client.delete_table(
            TableName=table_name
        )

        waiter = client.get_waiter('table_not_exists')

        waiter.wait(
            TableName=table_name
        )
    except Exception as error: #pylint: disable=broad-except
        logging.error("An error occurred deleting the table: %s", error)
        raise

def delete_old_backups(days, table_name, client):
    """Get a list of backups days old, and then delete them"""

    logging.info("Delete backups for %s that are older than %s days", table_name, days)

    try:
        backups = client.list_backups(
            TableName=table_name,
            TimeRangeUpperBound=datetime.datetime.now() - datetime.timedelta(days=days),
            BackupType='USER'
        )

        for backup in backups['BackupSummaries']:
            logging.info("Deleting backup %s", backup['BackupArn'])
            client.delete_backup(
                BackupArn=backup['BackupArn']
            )
    except Exception as error: #pylint: disable=broad-except
        logging.error("An error occurred deleting old backups: %s", error)
        raise

def run_backup_process(taskqueue):
    """Run the whole backup process"""
    while not taskqueue.empty():
        table_name = taskqueue.get()
        try:
            backuparn = make_backup(table_name, ddbaps2)
            cross_region_restore(table_name, backuparn, ddbaps1)
            make_backup(table_name, ddbaps1)
            delete_table(table_name, ddbaps1)
            delete_old_backups(RETENTION_DAYS, table_name, ddbaps1)
            delete_old_backups(RETENTION_DAYS, table_name, ddbaps2)
        except Exception as error: #pylint: disable=broad-except
            logging.error("An error occurred for the loop: %s", error)

if __name__ == "__main__":

    processes = []
    start = time.time()
    task_queue = multiprocessing.Queue()

    tables = ddbaps2.list_tables()
    for table in tables['TableNames']:
        task_queue.put(table)

    for n_proc in range(PROCESSES): #pylint: disable=unused-variable
        process = multiprocessing.Process(target=run_backup_process, args=(task_queue,))
        processes.append(process)
        process.start()
    for process in processes:
        process.join()
    logging.info("Time taken = %s", round(time.time() - start,2))

Deployment

This solution is written in Python and I deploy it into ECS with CDK.

CDK has some excellent precanned solutions for various purposes, though sometimes the assumptions the team(s) make can be frustrating and while escape hatches can help you sometimes, they aren’t always possible.

This is one of those times. CDK ecs-patterns has what you’d think would be a perfect solution ScheduledFargateTask. The issue is, that it expects the container to run in private subnets and to rely on everything being avaialble within the VPC. This was almost OK with Gateway and Interface endpoints, but to get to the destination region, you need to setup a VPC in the destination region, Gateway and/or Interfaces for each service, and then do cross region VPC peering. Not an ideal solution, not impossible, but was becoming more complex than it was worth.

I went back to first principals and created my own Scheduled Task with each component.

"""Deploy a task to backup DynamoDB tables"""
import os
from aws_cdk.aws_ecr_assets import DockerImageAsset
from aws_cdk import (
        core as cdk,
        aws_ecs as ecs,
        aws_ec2 as ec2,
        aws_iam as iam,
        aws_events as events,
        )

dirname = os.path.dirname(__file__)

class BackuptablesStack(cdk.Stack): #pylint: disable=too-few-public-methods
    """Deploy a task to backup DynamoDB tables"""
    def __init__(self, scope: cdk.Construct, construct_id: str, **kwargs) -> None: #pylint: disable=too-many-locals
        super().__init__(scope, construct_id, **kwargs)

        vpc = ec2.Vpc(self, 'app_vpc',
            max_azs=3,
            subnet_configuration=[ec2.SubnetConfiguration(
                cidr_mask=24,
                name='public',
                subnet_type=ec2.SubnetType.PUBLIC
            )],
            nat_gateways=0
        )

        interface_endpoints = {
            'ECR': ec2.InterfaceVpcEndpointAwsService.ECR,
            'ECS': ec2.InterfaceVpcEndpointAwsService.ECS,
            'ECR_DOCKER': ec2.InterfaceVpcEndpointAwsService.ECR_DOCKER,
            'ECS_TELEMETRY': ec2.InterfaceVpcEndpointAwsService.ECS_TELEMETRY,
            'ECS_AGENT': ec2.InterfaceVpcEndpointAwsService.ECS_AGENT,
            'CLOUDWATCH': ec2.InterfaceVpcEndpointAwsService.CLOUDWATCH,
            'CLOUDWATCH_LOGS': ec2.InterfaceVpcEndpointAwsService.CLOUDWATCH_LOGS
        }

        vpc.add_gateway_endpoint(
            'DYNAMODB',
            service=ec2.GatewayVpcEndpointAwsService.DYNAMODB
        )

        vpc.add_gateway_endpoint(
            'S3',
            service=ec2.GatewayVpcEndpointAwsService.S3
        )

        for name, service in interface_endpoints.items():
            vpc.add_interface_endpoint(name,
                service=service
            )

        cluster = ecs.Cluster(self, 'app_cluster', vpc=vpc)

        cluster_child = cluster.node.default_child
        cluster_child.add_override(
            "Properties.CapacityProviders",
            ['FARGATE', 'FARGATE_SPOT']
        )

        cross_region_backup_task_asset = DockerImageAsset(self, 'task',
            directory=os.path.join(dirname, '..', 'task')
        )

        task_def = ecs.FargateTaskDefinition(self, 'task_def')

        task_def.add_to_task_role_policy(
            iam.PolicyStatement(
                actions=[
                    'dynamodb:*'
                ],
                resources=['*']
            )
        )

        task_def.add_container('container',
            logging=ecs.LogDriver.aws_logs(
                stream_prefix='backuptables'
            ),
            image=ecs.ContainerImage.from_ecr_repository(
                cross_region_backup_task_asset.repository,
                cross_region_backup_task_asset.image_uri.rpartition(":")[-1]
            )
        )

        task_role = iam.Role(self, 'task_role',
            assumed_by=iam.ServicePrincipal('events.amazonaws.com')
        )

        task_role.attach_inline_policy(
            iam.Policy(self, 'task_policy',
                statements=[
                    iam.PolicyStatement(
                        actions=['ecs:RunTask'],
                        resources=[task_def.task_definition_arn],
                        conditions={
                            "ArnEquals": {"ecs:cluster": cluster.cluster_arn}
                        }
                    ),
                    iam.PolicyStatement(
                        actions=['iam:PassRole'],
                        resources=[task_def.execution_role.role_arn]
                    ),
                    iam.PolicyStatement(
                        actions=['iam:PassRole'],
                        resources=[task_def.task_role.role_arn]
                    )
                ]
            )
        )

        task_sec_group = ec2.SecurityGroup(self, 'task_sec_group',
            vpc=vpc,
            allow_all_outbound=True
        )

        selection = vpc.select_subnets(
            subnet_type=ec2.SubnetType.PUBLIC
        )

        subnet_ids=[]
        for subnet in selection.subnets:
            subnet_ids.append(subnet.subnet_id)

        ecs_network_config = events.CfnRule.NetworkConfigurationProperty(
            aws_vpc_configuration=events.CfnRule.AwsVpcConfigurationProperty(
                subnets=subnet_ids,
                security_groups=[task_sec_group.security_group_id],
                assign_public_ip='ENABLED'
            )
        )

        ecs_params = events.CfnRule.EcsParametersProperty(
            launch_type='FARGATE',
            task_count=1,
            network_configuration=ecs_network_config,
            task_definition_arn=task_def.task_definition_arn
        )

        target = events.CfnRule.TargetProperty(
            id='Target0',
            arn=cluster.cluster_arn,
            ecs_parameters=ecs_params,
            role_arn=task_role.role_arn
        )

        events.CfnRule(self, 'event_rule',
            schedule_expression="rate(1 hour)",
            state="ENABLED",
            targets=[target]
        )

Dockerfile

The docker file is really simple. We just need boto3 and the backup script:

FROM python:latest

ARG DEBIAN_FRONTEND="noninteractive"
ENV TZ=Australia/Sydney

RUN apt-get update && \
    apt-get dist-upgrade -y

WORKDIR /srv
COPY requirements.txt /srv
RUN pip install -r requirements.txt
COPY backuptables.py /srv

CMD ["./backuptables.py"]

Restore

I’ve also written a rudementary script to restore the most recent backups.

#!/usr/bin/env python

"""Restore tables"""

import os
import time
import datetime
import logging
import multiprocessing
import boto3

logging.basicConfig(level=logging.INFO)

PROCESSES = os.environ.get('PROCESSES', default=4)

DESTINATION_REGION = os.environ.get('DESTINATION_REGION', default='ap-southeast-1')

ddbaps1 = boto3.client('dynamodb', region_name=DESTINATION_REGION)

def restore_table(table):
    """Restore a table"""
    table_name = table['TableName']
    backup_arn = table['BackupArn']

    logging.info("Restoring %s from %s.", table_name, str(table['BackupCreationDateTime']))

    ddbaps1.restore_table_from_backup(
        TargetTableName=table_name,
        BackupArn=backup_arn,
        SSESpecificationOverride={
            'Enabled': True,
            'SSEType': 'KMS'
        }
    )

    waiter = ddbaps1.get_waiter('table_exists')

    waiter.wait(
        TableName=table_name
    )

def run_restore_process(taskqueue):
    """Run the whole restore process"""
    while not taskqueue.empty():
        table = taskqueue.get()
        restore_table(table)

if __name__ == "__main__":

    logging.info("Gathering backups to restore.")

    torestore={}

    backups = ddbaps1.list_backups(
        TimeRangeUpperBound=datetime.datetime.now() - datetime.timedelta(hours=2),
        BackupType='USER'
    )

    for backup in backups['BackupSummaries']:
        if backup['BackupStatus'] == 'AVAILABLE':
            if backup['TableName'] not in torestore.keys():
                torestore[backup['TableName']] = {
                    'BackupCreationDateTime': backup['BackupCreationDateTime'],
                    'BackupArn': backup['BackupArn'],
                    'TableName': backup['TableName']
                }
            else:
                #pylint: disable=line-too-long
                if backup['BackupCreationDateTime'] > torestore[backup['TableName']]['BackupCreationDateTime']:
                    torestore[backup['TableName']] = {
                        'BackupCreationDateTime': backup['BackupCreationDateTime'],
                        'BackupArn': backup['BackupArn'],
                        'TableName': backup['TableName']
                    }

    processes = []
    start = time.time()
    task_queue = multiprocessing.Queue()

    for restoreitem in torestore.values():
        task_queue.put(restoreitem)

    for n_proc in range(PROCESSES): #pylint: disable=unused-variable
        process = multiprocessing.Process(target=run_restore_process, args=(task_queue,))
        processes.append(process)
        process.start()
    for process in processes:
        process.join()
    logging.info("Time taken = %s", round(time.time() - start,2))

Hopefully you find this solution interesting and it solves a problem for you.

© Greg Cockburn

Powered by Hugo & Kiss.