Hi --
I do have an automated solution that is working well to maintain CD2 data in a postgresql database. I've opted to use AWS Lambda functions and AWS Step Functions to implement my workflow. Because of the limitations of Lambda (15-minute runtime limit) I'm currently seeing occasional timeouts syncing individual tables, but they're typically successfully synced in the next run. I'm currently refreshing everything once an hour.
Here's a diagram of the Step Function that orchestrates the workflow:
There's a lot going on there, so I'll break it down:
- The first step is to fetch the list of tables from the DAP API using the ListTables Lambda function. The list of tables is passed to the next step.
- The second step is to iterate over the list of tables, running the SyncTable Lambda function for each one. This function essentially runs "dap syncdb".
- Next I check the output of the SyncTable function -- if there's an error indicating that the table doesn't exist yet in the local db, I call the InitTable Lambda function which essentially runs "dap initdb"
- Once all of the iterations are complete, a notification is sent with the list of tables that were processed.
The iteration step can process multiple tables in parallel -- I'm currently using a maximum concurrency of 30. The whole process typically only takes a few minutes to complete.
I still have some work to do in order to better handle (or avoid) Lambda timeouts, and I need to add better error/output checking and reporting for each of the steps. But this does seem to be a workable design. I'm expecting that when the dap library offers a more developer-oriented API I will be able to refactor this a bit and make it both more efficient and robust.
I don't recommend trying to implement this yourself just yet as it's still a rough work in progress, but I'll share the Lambda code below so you can get a better sense of how it works.
--Colin
ListTables function
import asyncio
import os
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities import parameters
from aws_lambda_powertools.utilities.typing import LambdaContext
from botocore.config import Config
from dap.api import DAPClient
from dap.dap_types import Credentials
region = os.environ.get('AWS_REGION')
config = Config(region_name=region)
ssm_provider = parameters.SSMProvider(config=config)
logger = Logger()
env = os.environ.get('ENV', 'dev')
param_path = f'/{env}/canvas_data_2'
api_base_url = os.environ.get('API_BASE_URL', 'https://api-gateway.instructure.com')
namespace = 'canvas'
@logger.inject_lambda_context(log_event=True)
def lambda_handler(event, context: LambdaContext):
params = ssm_provider.get_multiple(param_path, max_age=600, decrypt=True)
dap_client_id = params['dap_client_id']
dap_client_secret = params['dap_client_secret']
logger.info(f"dap_client_id: {dap_client_id}")
credentials = Credentials.create(client_id=dap_client_id, client_secret=dap_client_secret)
tables = asyncio.get_event_loop().run_until_complete(async_get_tables(api_base_url, credentials, namespace))
# we can skip certain tables if necessary by setting an environment variable (comma-separated list)
skip_tables = os.environ.get('SKIP_TABLES', '').split(',')
tmap = list(map(lambda t: {'table_name': t}, [t for t in tables if t not in skip_tables]))
return {'tables': tmap}
async def async_get_tables(api_base_url: str, credentials: Credentials, namespace: str):
async with DAPClient(
base_url=api_base_url,
credentials=credentials,
) as session:
return await session.get_tables(namespace)
SyncTable function
import asyncio
import os
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities import parameters
from aws_lambda_powertools.utilities.typing import LambdaContext
from botocore.config import Config
from dap.dap_types import Credentials
from dap.actions.sync_db import sync_db
from dap.model.meta_table import NoMetadataError
region = os.environ.get('AWS_REGION')
config = Config(region_name=region)
ssm_provider = parameters.SSMProvider(config=config)
logger = Logger()
env = os.environ.get('ENV', 'dev')
param_path = f'/{env}/canvas_data_2'
api_base_url = os.environ.get('API_BASE_URL', 'https://api-gateway.instructure.com')
namespace = 'canvas'
def lambda_handler(event, context: LambdaContext):
params = ssm_provider.get_multiple(param_path, max_age=600, decrypt=True)
dap_client_id = params['dap_client_id']
dap_client_secret = params['dap_client_secret']
db_user = params['db_default_user']
db_password = params['db_default_password']
db_name = params['db_default_name']
db_host = params['db_default_host']
db_port = params.get('db_default_port', 5432)
conn_str = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
credentials = Credentials.create(client_id=dap_client_id, client_secret=dap_client_secret)
table_name = event['table_name']
logger.info(f"syncing table: {table_name}")
try:
asyncio.get_event_loop().run_until_complete(
sync_db(
base_url=api_base_url,
namespace=namespace,
table_name=table_name,
credentials=credentials,
connection_string=conn_str,
)
)
event['state'] = 'complete'
except NoMetadataError as e:
logger.exception(e)
event['state'] = 'needs_init'
logger.info(f"event: {event}")
return event
InitTable function
import asyncio
import os
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities import parameters
from aws_lambda_powertools.utilities.typing import LambdaContext
from botocore.config import Config
from dap.dap_types import Credentials
from dap.actions.init_db import init_db
from dap.model.meta_table import NoMetadataError
region = os.environ.get('AWS_REGION')
config = Config(region_name=region)
ssm_provider = parameters.SSMProvider(config=config)
logger = Logger()
env = os.environ.get('ENV', 'dev')
param_path = f'/{env}/canvas_data_2'
api_base_url = os.environ.get('API_BASE_URL', 'https://api-gateway.instructure.com')
namespace = 'canvas'
def lambda_handler(event, context: LambdaContext):
params = ssm_provider.get_multiple(param_path, max_age=600, decrypt=True)
dap_client_id = params['dap_client_id']
dap_client_secret = params['dap_client_secret']
db_user = params['db_default_user']
db_password = params['db_default_password']
db_name = params['db_default_name']
db_host = params['db_default_host']
db_port = params.get('db_default_port', 5432)
conn_str = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
credentials = Credentials.create(client_id=dap_client_id, client_secret=dap_client_secret)
table_name = event['table_name']
logger.info(f"initting table: {table_name}")
try:
asyncio.get_event_loop().run_until_complete(
init_db(
base_url=api_base_url,
namespace=namespace,
table_name=table_name,
credentials=credentials,
connection_string=conn_str,
)
)
event['state'] = 'complete'
except Exception as e:
logger.exception(e)
event['state'] = 'failed'
logger.info(f"event: {event}")
return event