D6926: ci: retry expired spot instance requests
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Tue Oct 1 04:41:27 UTC 2019
indygreg updated this revision to Diff 16723.
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D6926?vs=16720&id=16723
CHANGES SINCE LAST ACTION
https://phab.mercurial-scm.org/D6926/new/
REVISION DETAIL
https://phab.mercurial-scm.org/D6926
AFFECTED FILES
contrib/ci/README.rst
contrib/ci/lambda_functions/ci.py
contrib/ci/lambda_functions/web.py
contrib/ci/terraform/job_executor.tf
contrib/ci/terraform/spot_instance_request_monitor.tf
contrib/ci/terraform/storage.tf
CHANGE DETAILS
diff --git a/contrib/ci/terraform/storage.tf b/contrib/ci/terraform/storage.tf
--- a/contrib/ci/terraform/storage.tf
+++ b/contrib/ci/terraform/storage.tf
@@ -55,8 +55,21 @@
name = "job_id"
type = "S"
}
+ attribute {
+ name = "execution_state"
+ type = "S"
+ }
hash_key = "job_id"
+
+ # This allows us to easily query for jobs in a specific state.
+ global_secondary_index {
+ name = "execution_state"
+ hash_key = "execution_state"
+ range_key = "job_id"
+ projection_type = "ALL"
+ }
+
}
# Tracks results for individual tests in each job.
diff --git a/contrib/ci/terraform/spot_instance_request_monitor.tf b/contrib/ci/terraform/spot_instance_request_monitor.tf
new file mode 100644
--- /dev/null
+++ b/contrib/ci/terraform/spot_instance_request_monitor.tf
@@ -0,0 +1,94 @@
+# Defines resources to monitor spot instance requests.
+
+resource "aws_cloudwatch_log_group" "lambda_ci_spot_instance_request_monitor" {
+ name = "/aws/lambda/${aws_lambda_function.ci_spot_instance_request_monitor.function_name}"
+ retention_in_days = 7
+}
+
+resource "aws_iam_role" "lambda_ci_spot_instance_request_monitor" {
+ name = "lambda-ci-spot-instance-request-monitor"
+ description = "For Lambda function monitoring spot instance requests"
+ assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json
+}
+
+# Function that monitors spot instance requests and retries failed ones.
+resource "aws_lambda_function" "ci_spot_instance_request_monitor" {
+ function_name = "ci-spot-instance-request-monitor"
+ description = "Monitors spot instance requests and triggers activity"
+ filename = data.archive_file.lambda_ci.output_path
+ handler = "ci.handle_spot_instance_request_monitor"
+ source_code_hash = data.archive_file.lambda_ci.output_base64sha256
+ runtime = "python3.7"
+ timeout = 60
+ role = aws_iam_role.lambda_ci_spot_instance_request_monitor.arn
+ environment {
+ variables = {
+ DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name
+ LAMBDA_START_JOB_FUNCTION = aws_lambda_function.ci_start_job.function_name
+ }
+ }
+}
+
+data "aws_iam_policy_document" "lambda_ci_spot_instance_request_monitor" {
+ # Allow Lambda function to write CloudWatch events.
+ statement {
+ effect = "Allow"
+ actions = [
+ "logs:CreateLogGroup",
+ "logs:CreateLogStream",
+ "logs:PutLogEvents",
+ ]
+ resources = [aws_cloudwatch_log_group.lambda_ci_spot_instance_request_monitor.arn]
+ }
+ # Allow querying spot instance requests.
+ statement {
+ effect = "Allow"
+ actions = [
+ "ec2:DescribeSpotInstanceRequests",
+ ]
+ resources = ["*"]
+ }
+ # Allow fetching job state from DynamoDB.
+ statement {
+ effect = "Allow"
+ actions = [
+ "dynamodb:Query",
+ ]
+ resources = [
+ aws_dynamodb_table.ci_job.arn,
+ "${aws_dynamodb_table.ci_job.arn}/*",
+ ]
+ }
+ # Allow invoking the start job Lambda function.
+ statement {
+ effect = "Allow"
+ actions = ["lambda:InvokeFunction"]
+ resources = [aws_lambda_function.ci_start_job.arn]
+ }
+}
+
+resource "aws_iam_role_policy" "lambda_ci_spot_instance_request_monitor" {
+ role = aws_iam_role.lambda_ci_spot_instance_request_monitor.name
+ name = aws_iam_role.lambda_ci_spot_instance_request_monitor.name
+ policy = data.aws_iam_policy_document.lambda_ci_spot_instance_request_monitor.json
+}
+
+# Periodically trigger the Lambda function so state is continuously monitored.
+resource "aws_cloudwatch_event_rule" "trigger_ci_spot_instance_request_monitor" {
+ name = "trigger-ci-spot-instance-request-monitor"
+ description = "Trigger monitoring of spot instance requests"
+ schedule_expression = "rate(1 minute)"
+}
+
+resource "aws_cloudwatch_event_target" "ci_spot_instance_request_monitor" {
+ rule = aws_cloudwatch_event_rule.trigger_ci_spot_instance_request_monitor.name
+ arn = aws_lambda_function.ci_spot_instance_request_monitor.arn
+}
+
+resource "aws_lambda_permission" "ci_spot_instance_request_monitor_allow_cloudwatch" {
+ statement_id = "AllowExecutionFromCloudWatch"
+ action = "lambda:InvokeFunction"
+ function_name = aws_lambda_function.ci_spot_instance_request_monitor.function_name
+ principal = "events.amazonaws.com"
+ source_arn = aws_cloudwatch_event_rule.trigger_ci_spot_instance_request_monitor.arn
+}
diff --git a/contrib/ci/terraform/job_executor.tf b/contrib/ci/terraform/job_executor.tf
--- a/contrib/ci/terraform/job_executor.tf
+++ b/contrib/ci/terraform/job_executor.tf
@@ -204,3 +204,60 @@
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.trigger_instance_state_change.arn
}
+
+resource "aws_cloudwatch_log_group" "lambda_ci_start_job" {
+ name = "/aws/lambda/${aws_lambda_function.ci_start_job.function_name}"
+ retention_in_days = 7
+}
+
+resource "aws_iam_role" "lambda_ci_start_job" {
+ name = "lambda-ci-start-job"
+ description = "For Lambda function to trigger job start"
+ assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json
+}
+
+# Lambda function for starting a job. This is similar to `run_pending_job`
+# except it isn't a handler for SQS events.
+resource "aws_lambda_function" "ci_start_job" {
+ function_name = "ci-start-job"
+ description = "Starts a scheduled CI job"
+ filename = data.archive_file.lambda_ci.output_path
+ handler = "ci.handle_start_job"
+ source_code_hash = data.archive_file.lambda_ci.output_base64sha256
+ runtime = "python3.7"
+ timeout = 60
+ role = aws_iam_role.lambda_ci_start_job.arn
+ environment {
+ variables = {
+ DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name
+ }
+ }
+}
+
+# Inherit the policy from run_pending_job.
+resource "aws_iam_role_policy" "lambda_ci_start_job_run_pending_job" {
+ role = aws_iam_role.lambda_ci_start_job.name
+ name = "run-pending-job"
+ policy = data.aws_iam_policy_document.lambda_ci_run_pending_job.json
+}
+
+# Add supplement with additional policy.
+data "aws_iam_policy_document" "lambda_ci_start_job" {
+ # Allow Lambda function to write CloudWatch events.
+ statement {
+ effect = "Allow"
+ actions = [
+ "logs:CreateLogGroup",
+ "logs:CreateLogStream",
+ "logs:PutLogEvents",
+ ]
+ resources = [
+ aws_cloudwatch_log_group.lambda_ci_start_job.arn]
+ }
+}
+
+resource "aws_iam_role_policy" "lambda_ci_start_job" {
+ role = aws_iam_role.lambda_ci_start_job.name
+ name = aws_iam_role.lambda_ci_start_job.name
+ policy = data.aws_iam_policy_document.lambda_ci_start_job.json
+}
diff --git a/contrib/ci/lambda_functions/web.py b/contrib/ci/lambda_functions/web.py
--- a/contrib/ci/lambda_functions/web.py
+++ b/contrib/ci/lambda_functions/web.py
@@ -164,7 +164,8 @@
else:
skip_count = 'n/a'
- if job_info['execution_state'] in ('pending', 'spot-instance-requested', 'running'):
+ if job_info['execution_state'] in (
+ 'pending', 'spot-instance-requested', 'cancelled', 'running'):
job_state = job_info['execution_state']
elif job_info['execution_state'] == 'done':
exit_clean = job_info.get('exit_clean')
diff --git a/contrib/ci/lambda_functions/ci.py b/contrib/ci/lambda_functions/ci.py
--- a/contrib/ci/lambda_functions/ci.py
+++ b/contrib/ci/lambda_functions/ci.py
@@ -68,6 +68,32 @@
start_pending_job(ec2, job_table, job_id)
+def handle_start_job(event, context):
+ """Handler for ci-start-job function."""
+ ec2 = boto3.client('ec2')
+ dynamodb = boto3.resource('dynamodb')
+
+ job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE'])
+
+ job_id = event['job_id']
+ print('received request to start job %s' % job_id)
+
+ start_pending_job(ec2, job_table, job_id)
+
+
+def handle_spot_instance_request_monitor(event, context):
+ """Handler to invoke spot instance request monitor."""
+ ec2 = boto3.client('ec2')
+ dynamodb = boto3.resource('dynamodb')
+ lambda_client = boto3.client('lambda')
+
+ job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE'])
+ start_job_function = os.environ['LAMBDA_START_JOB_FUNCTION']
+
+ monitor_spot_instance_requests(ec2, job_table, lambda_client,
+ start_job_function)
+
+
def handle_job_result_s3_artifact(event, context):
"""Handler called when a new S3 object job artifact is uploaded."""
dynamodb = boto3.resource('dynamodb')
@@ -455,14 +481,70 @@
job_id = job['job_id']
print('requesting spot instance for job %s' % job_id)
+ availability_zones = [
+ az['ZoneName']
+ for az in ec2.describe_availability_zones()['AvailabilityZones']
+ if az['State'] == 'available']
+
# Fresh job request.
if job['execution_state'] == 'pending':
# Pick an availability zone randomly.
- availability_zones = [
- az['ZoneName']
- for az in ec2.describe_availability_zones()['AvailabilityZones']
- if az['State'] == 'available']
availability_zone = random.choice(availability_zones)
+
+ # Looks like we previously tried to launch a spot instance for this
+ # job. Examine the state of that request and make sure we can
+ # replace it.
+ elif job['execution_state'] == 'spot-instance-requested':
+ spot_instance_requests = ec2.describe_spot_instance_requests(
+ SpotInstanceRequestIds=[job['spot_instance_request_id']],
+ )['SpotInstanceRequests']
+
+ # This should never happen assuming this code path doesn't run after
+ # the spot request was finalized, which should never happen, since
+ # we trigger things periodically.
+ if not spot_instance_requests:
+ print('unable to find spot instance request for job %s' % job_id)
+ return
+
+ sir = spot_instance_requests[0]
+
+ # If the spot request expired, try again on to the next availability
+ # zone.
+ if (sir['State'] == 'cancelled'
+ and sir['Status']['Code'] == 'schedule-expired'):
+ print('previous spot request for job %s expired; '
+ 'trying different az' % job_id)
+
+ previous_az = sir['LaunchSpecification']['Placement']['AvailabilityZone']
+ previous_index = availability_zones.index(previous_az)
+ try:
+ availability_zone = availability_zones[previous_index + 1]
+ except IndexError:
+ availability_zone = availability_zones[0]
+ else:
+ print('unhandled spot instance request state for job %s: '
+ '%s; %s: %s' % (
+ job_id,
+ sir['State'],
+ sir['Status']['Code'],
+ sir['Status']['Message']))
+ print('cancelling job %s' % job_id)
+ job_table.update_item(
+ Key={'job_id': job_id},
+ UpdateExpression=(
+ 'set execution_state = :state, '
+ 'cancelled_reason = :reason'
+ ),
+ ExpressionAttributeValues={
+ ':state': 'cancelled',
+ ':reason': 'spot state: %s; %s' % (
+ sir['State'],
+ sir['Status']['Code'],
+ ),
+ },
+ )
+ return
+
else:
print('unhandled execution_state: %s' % job['execution_state'])
return
@@ -485,10 +567,13 @@
# we can just retry the job.
#
# The max bid price is the on-demand price. So in the typical case we save
- # $$$. If we're unlucky we pay the on-demand rate. You can't lose.
+ # $$$. If we're unlucky we pay the on-demand rate. You can't lose. Unless
+ # there are no available spot instances. But we handle this by setting a
+ # short request validity window and retrying in a different availability
+ # zone. Eventually we should find someone willing to satisfy our request.
res = ec2.request_spot_instances(
BlockDurationMinutes=60,
- ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=10),
+ ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=1),
LaunchSpecification=launch_spec,
)
@@ -511,6 +596,55 @@
)
+def monitor_spot_instance_requests(ec2, job_table, lambda_client,
+ start_job_function):
+ """Looks for stale spot instance requests and updates state accordingly."""
+ print('monitoring for stale spot instance requests')
+
+ # We could call the EC2 API directly. But we want DynamoDB to be our
+ # source of truth. So key off its state.
+ res = job_table.query(
+ IndexName='execution_state',
+ KeyConditionExpression=Key('execution_state').eq('spot-instance-requested'),
+ Select='ALL_PROJECTED_ATTRIBUTES',
+ )
+ print('found %d jobs in spot-instance-requested state' % len(res['Items']))
+
+ for item in res['Items']:
+ print('job %s is in spot-instance-requested' % item['job_id'])
+
+ # If the spot instance request is expired, retrigger scheduling.
+ request_id = item['spot_instance_request_id']
+ print('checking state of %s' % request_id)
+ spot_instance_requests = ec2.describe_spot_instance_requests(
+ SpotInstanceRequestIds=[request_id],
+ )['SpotInstanceRequests']
+
+ if not spot_instance_requests:
+ print('could not find %s; weird' % request_id)
+ continue
+
+ sir = spot_instance_requests[0]
+
+ print('spot instance request %s is in state %s: %s' % (
+ request_id, sir['State'], sir['Status']['Code']))
+
+ if (sir['State'] == 'cancelled'
+ and sir['Status']['Code'] == 'schedule-expired'):
+ print('spot instance request %s for job %s has expired; '
+ 'retrying scheduling' % (request_id, item['job_id']))
+
+ payload = json.dumps({
+ 'job_id': item['job_id'],
+ })
+
+ lambda_client.invoke(
+ FunctionName=start_job_function,
+ InvocationType='Event',
+ Payload=payload,
+ )
+
+
def react_to_instance_state_change(job_table, instance, state):
"""React to a CI worker instance state change."""
now = decimal.Decimal(time.time())
diff --git a/contrib/ci/README.rst b/contrib/ci/README.rst
--- a/contrib/ci/README.rst
+++ b/contrib/ci/README.rst
@@ -139,8 +139,36 @@
This function will update job records in DynamoDB to record that a
job has started/finished/aborted.
+This component contains a *start job* Lambda function, which can be
+invoked with the Job ID of a job that someone wishes to start. It
+tries to ensure the job has run. In the case of an expired spot instance
+request, it will try again.
+
The Terraform code for this component lives in ``job_executor.tf``.
+Spot Instance Request Monitor
+-----------------------------
+
+The *spot instance request monitor* is a component for monitoring the
+state of spot instance requests.
+
+We utilize EC2 spot instances to run jobs. Spot instances, unlike
+on-demand instances, can't be launched directly. Instead, you create
+a request for spot instances and this request is eventually fulfilled
+by EC2, if possible. Often, the request is fulfilled immediately and
+an EC2 instance launches within a few seconds.
+
+But sometimes a spot instance request fails. This is often due to no
+spot instances being available at that time. This is where this component
+plays a part.
+
+The *spot instance request monitor* is a Lambda function that is
+periodically invoked via a CloudWatch Event. It scans all jobs currently
+waiting on a spot instance request. If the spot instance request
+couldn't be fulfilled, it calls out to the *start job* Lambda function
+to tell it to try to reschedule it. Theoretically, the job should
+eventually start.
+
Worker
------
To: indygreg, #hg-reviewers
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list