D6924: ci: store job start parameters in DynamoDB
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Tue Oct 1 03:58:22 UTC 2019
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.
REVISION SUMMARY
This makes it vastly easier to retry or retrigger a job,
since the instance launch state will be stored in the
database instead of in an ad-hoc SQS message.
REPOSITORY
rHG Mercurial
REVISION DETAIL
https://phab.mercurial-scm.org/D6924
AFFECTED FILES
contrib/ci/lambda_functions/ci.py
contrib/ci/terraform/job_executor.tf
CHANGE DETAILS
diff --git a/contrib/ci/terraform/job_executor.tf b/contrib/ci/terraform/job_executor.tf
--- a/contrib/ci/terraform/job_executor.tf
+++ b/contrib/ci/terraform/job_executor.tf
@@ -16,6 +16,11 @@
runtime = "python3.7"
timeout = 60
role = aws_iam_role.lambda_ci_run_pending_job.arn
+ environment {
+ variables = {
+ DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name
+ }
+ }
}
resource "aws_cloudwatch_log_group" "lambda_ci_run_pending_job" {
@@ -63,6 +68,16 @@
]
resources = ["*"]
}
+ # Allow querying job state in DynamoDB.
+ statement {
+ effect = "Allow"
+ actions = [
+ "dynamodb:GetItem",
+ ]
+ resources = [
+ aws_dynamodb_table.ci_job.arn,
+ ]
+ }
}
resource "aws_iam_role_policy" "lambda_ci_handle_pending_job" {
diff --git a/contrib/ci/lambda_functions/ci.py b/contrib/ci/lambda_functions/ci.py
--- a/contrib/ci/lambda_functions/ci.py
+++ b/contrib/ci/lambda_functions/ci.py
@@ -54,16 +54,17 @@
def handle_pending_job(event, context):
"""Handler for starting a job from an SQS message."""
ec2 = boto3.client('ec2')
+ dynamodb = boto3.resource('dynamodb')
+
+ job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE'])
for record in event['Records']:
body = record['body']
data = json.loads(body)
- user_data_template = data['user_data_template']
- user_data_params = data['user_data_params']
- ec2_instance_config = data['ec2_instance_launch_config']
+ job_id = data['job_id']
- start_pending_job(ec2, user_data_template, user_data_params, ec2_instance_config)
+ start_pending_job(ec2, job_table, job_id)
def handle_job_result_s3_artifact(event, context):
@@ -409,12 +410,6 @@
'SecurityGroups': ['hg-linux-dev-1'],
}
- job_params = {
- 'user_data_template': LINUX_USER_DATA,
- 'user_data_params': user_data_params,
- 'ec2_instance_launch_config': config,
- }
-
schedule_time = decimal.Decimal(time.time())
print('registering job in DynamoDB')
@@ -426,17 +421,35 @@
'build_number': build_number,
'execution_state': 'pending',
'schedule_time': schedule_time,
+ # We encode as JSON because DynamoDB doesn't like some types
+ # like empty strings.
+ 'start_params': json.dumps({
+ 'user_data_template': LINUX_USER_DATA,
+ 'user_data_params': user_data_params,
+ 'ec2_instance_launch_config': config,
+ }, sort_keys=True),
})
print('adding job to pending queue')
sqs.send_message(
QueueUrl=sqs_url,
- MessageBody=json.dumps(job_params, sort_keys=True)
+ MessageBody=json.dumps({'job_id': job_id})
)
-def start_pending_job(ec2, user_data_template, user_data_params, ec2_instance_config):
+def start_pending_job(ec2, job_table, job_id):
"""Called to request the start of a pending job."""
+ res = job_table.get_item(Key={'job_id': job_id}, ConsistentRead=True)
+ if 'Item' not in res:
+ print('unable to find job %s' % job_id)
+ return
+
+ job = res['Item']
+ start_params = json.loads(job['start_params'])
+ user_data_template = start_params['user_data_template']
+ user_data_params = start_params['user_data_params']
+ ec2_instance_config = start_params['ec2_instance_launch_config']
+
user_data = user_data_template.format(**user_data_params)
print('requesting spot instance for job %s' % user_data_params['job_id'])
To: indygreg, #hg-reviewers
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list