D6926: ci: retry expired spot instance requests

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Tue Oct 1 04:41:27 UTC 2019


indygreg updated this revision to Diff 16723.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D6926?vs=16720&id=16723

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D6926/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D6926

AFFECTED FILES
  contrib/ci/README.rst
  contrib/ci/lambda_functions/ci.py
  contrib/ci/lambda_functions/web.py
  contrib/ci/terraform/job_executor.tf
  contrib/ci/terraform/spot_instance_request_monitor.tf
  contrib/ci/terraform/storage.tf

CHANGE DETAILS

diff --git a/contrib/ci/terraform/storage.tf b/contrib/ci/terraform/storage.tf
--- a/contrib/ci/terraform/storage.tf
+++ b/contrib/ci/terraform/storage.tf
@@ -55,8 +55,21 @@
     name = "job_id"
     type = "S"
   }
+  attribute {
+    name = "execution_state"
+    type = "S"
+  }
 
   hash_key = "job_id"
+
+  # This allows us to easily query for jobs in a specific state.
+  global_secondary_index {
+    name = "execution_state"
+    hash_key = "execution_state"
+    range_key = "job_id"
+    projection_type = "ALL"
+  }
+
 }
 
 # Tracks results for individual tests in each job.
diff --git a/contrib/ci/terraform/spot_instance_request_monitor.tf b/contrib/ci/terraform/spot_instance_request_monitor.tf
new file mode 100644
--- /dev/null
+++ b/contrib/ci/terraform/spot_instance_request_monitor.tf
@@ -0,0 +1,94 @@
+# Defines resources to monitor spot instance requests.
+
+resource "aws_cloudwatch_log_group" "lambda_ci_spot_instance_request_monitor" {
+  name = "/aws/lambda/${aws_lambda_function.ci_spot_instance_request_monitor.function_name}"
+  retention_in_days = 7
+}
+
+resource "aws_iam_role" "lambda_ci_spot_instance_request_monitor" {
+  name = "lambda-ci-spot-instance-request-monitor"
+  description = "For Lambda function monitoring spot instance requests"
+  assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json
+}
+
+# Function that monitors spot instance requests and retries failed ones.
+resource "aws_lambda_function" "ci_spot_instance_request_monitor" {
+  function_name = "ci-spot-instance-request-monitor"
+  description = "Monitors spot instance requests and triggers activity"
+  filename = data.archive_file.lambda_ci.output_path
+  handler = "ci.handle_spot_instance_request_monitor"
+  source_code_hash = data.archive_file.lambda_ci.output_base64sha256
+  runtime = "python3.7"
+  timeout = 60
+  role = aws_iam_role.lambda_ci_spot_instance_request_monitor.arn
+  environment {
+    variables = {
+      DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name
+      LAMBDA_START_JOB_FUNCTION = aws_lambda_function.ci_start_job.function_name
+    }
+  }
+}
+
+data "aws_iam_policy_document" "lambda_ci_spot_instance_request_monitor" {
+  # Allow Lambda function to write CloudWatch events.
+  statement {
+    effect = "Allow"
+    actions = [
+      "logs:CreateLogGroup",
+      "logs:CreateLogStream",
+      "logs:PutLogEvents",
+    ]
+    resources = [aws_cloudwatch_log_group.lambda_ci_spot_instance_request_monitor.arn]
+  }
+  # Allow querying spot instance requests.
+  statement {
+    effect = "Allow"
+    actions = [
+      "ec2:DescribeSpotInstanceRequests",
+    ]
+    resources = ["*"]
+  }
+  # Allow fetching job state from DynamoDB.
+  statement {
+    effect = "Allow"
+    actions = [
+      "dynamodb:Query",
+    ]
+    resources = [
+      aws_dynamodb_table.ci_job.arn,
+      "${aws_dynamodb_table.ci_job.arn}/*",
+    ]
+  }
+  # Allow invoking the start job Lambda function.
+  statement {
+    effect = "Allow"
+    actions = ["lambda:InvokeFunction"]
+    resources = [aws_lambda_function.ci_start_job.arn]
+  }
+}
+
+resource "aws_iam_role_policy" "lambda_ci_spot_instance_request_monitor" {
+  role = aws_iam_role.lambda_ci_spot_instance_request_monitor.name
+  name = aws_iam_role.lambda_ci_spot_instance_request_monitor.name
+  policy = data.aws_iam_policy_document.lambda_ci_spot_instance_request_monitor.json
+}
+
+# Periodically trigger the Lambda function so state is continuously monitored.
+resource "aws_cloudwatch_event_rule" "trigger_ci_spot_instance_request_monitor" {
+  name = "trigger-ci-spot-instance-request-monitor"
+  description = "Trigger monitoring of spot instance requests"
+  schedule_expression = "rate(1 minute)"
+}
+
+resource "aws_cloudwatch_event_target" "ci_spot_instance_request_monitor" {
+  rule = aws_cloudwatch_event_rule.trigger_ci_spot_instance_request_monitor.name
+  arn = aws_lambda_function.ci_spot_instance_request_monitor.arn
+}
+
+resource "aws_lambda_permission" "ci_spot_instance_request_monitor_allow_cloudwatch" {
+  statement_id = "AllowExecutionFromCloudWatch"
+  action = "lambda:InvokeFunction"
+  function_name = aws_lambda_function.ci_spot_instance_request_monitor.function_name
+  principal = "events.amazonaws.com"
+  source_arn = aws_cloudwatch_event_rule.trigger_ci_spot_instance_request_monitor.arn
+}
diff --git a/contrib/ci/terraform/job_executor.tf b/contrib/ci/terraform/job_executor.tf
--- a/contrib/ci/terraform/job_executor.tf
+++ b/contrib/ci/terraform/job_executor.tf
@@ -204,3 +204,60 @@
   principal = "events.amazonaws.com"
   source_arn = aws_cloudwatch_event_rule.trigger_instance_state_change.arn
 }
+
+resource "aws_cloudwatch_log_group" "lambda_ci_start_job" {
+  name = "/aws/lambda/${aws_lambda_function.ci_start_job.function_name}"
+  retention_in_days = 7
+}
+
+resource "aws_iam_role" "lambda_ci_start_job" {
+  name = "lambda-ci-start-job"
+  description = "For Lambda function to trigger job start"
+  assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json
+}
+
+# Lambda function for starting a job. This is similar to `run_pending_job`
+# except it isn't a handler for SQS events.
+resource "aws_lambda_function" "ci_start_job" {
+  function_name = "ci-start-job"
+  description = "Starts a scheduled CI job"
+  filename = data.archive_file.lambda_ci.output_path
+  handler = "ci.handle_start_job"
+  source_code_hash = data.archive_file.lambda_ci.output_base64sha256
+  runtime = "python3.7"
+  timeout = 60
+  role = aws_iam_role.lambda_ci_start_job.arn
+  environment {
+    variables = {
+      DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name
+    }
+  }
+}
+
+# Inherit the policy from run_pending_job.
+resource "aws_iam_role_policy" "lambda_ci_start_job_run_pending_job" {
+  role = aws_iam_role.lambda_ci_start_job.name
+  name = "run-pending-job"
+  policy = data.aws_iam_policy_document.lambda_ci_run_pending_job.json
+}
+
+# Add supplement with additional policy.
+data "aws_iam_policy_document" "lambda_ci_start_job" {
+  # Allow Lambda function to write CloudWatch events.
+  statement {
+    effect = "Allow"
+    actions = [
+      "logs:CreateLogGroup",
+      "logs:CreateLogStream",
+      "logs:PutLogEvents",
+    ]
+    resources = [
+      aws_cloudwatch_log_group.lambda_ci_start_job.arn]
+  }
+}
+
+resource "aws_iam_role_policy" "lambda_ci_start_job" {
+  role = aws_iam_role.lambda_ci_start_job.name
+  name = aws_iam_role.lambda_ci_start_job.name
+  policy = data.aws_iam_policy_document.lambda_ci_start_job.json
+}
diff --git a/contrib/ci/lambda_functions/web.py b/contrib/ci/lambda_functions/web.py
--- a/contrib/ci/lambda_functions/web.py
+++ b/contrib/ci/lambda_functions/web.py
@@ -164,7 +164,8 @@
                     else:
                         skip_count = 'n/a'
 
-                    if job_info['execution_state'] in ('pending', 'spot-instance-requested', 'running'):
+                    if job_info['execution_state'] in (
+                        'pending', 'spot-instance-requested', 'cancelled', 'running'):
                         job_state = job_info['execution_state']
                     elif job_info['execution_state'] == 'done':
                         exit_clean = job_info.get('exit_clean')
diff --git a/contrib/ci/lambda_functions/ci.py b/contrib/ci/lambda_functions/ci.py
--- a/contrib/ci/lambda_functions/ci.py
+++ b/contrib/ci/lambda_functions/ci.py
@@ -68,6 +68,32 @@
         start_pending_job(ec2, job_table, job_id)
 
 
+def handle_start_job(event, context):
+    """Handler for ci-start-job function."""
+    ec2 = boto3.client('ec2')
+    dynamodb = boto3.resource('dynamodb')
+
+    job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE'])
+
+    job_id = event['job_id']
+    print('received request to start job %s' % job_id)
+
+    start_pending_job(ec2, job_table, job_id)
+
+
+def handle_spot_instance_request_monitor(event, context):
+    """Handler to invoke spot instance request monitor."""
+    ec2 = boto3.client('ec2')
+    dynamodb = boto3.resource('dynamodb')
+    lambda_client = boto3.client('lambda')
+
+    job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE'])
+    start_job_function = os.environ['LAMBDA_START_JOB_FUNCTION']
+
+    monitor_spot_instance_requests(ec2, job_table, lambda_client,
+                                   start_job_function)
+
+
 def handle_job_result_s3_artifact(event, context):
     """Handler called when a new S3 object job artifact is uploaded."""
     dynamodb = boto3.resource('dynamodb')
@@ -455,14 +481,70 @@
     job_id = job['job_id']
     print('requesting spot instance for job %s' % job_id)
 
+    availability_zones = [
+        az['ZoneName']
+        for az in ec2.describe_availability_zones()['AvailabilityZones']
+        if az['State'] == 'available']
+
     # Fresh job request.
     if job['execution_state'] == 'pending':
         # Pick an availability zone randomly.
-        availability_zones = [
-            az['ZoneName']
-            for az in ec2.describe_availability_zones()['AvailabilityZones']
-            if az['State'] == 'available']
         availability_zone = random.choice(availability_zones)
+
+    # Looks like we previously tried to launch a spot instance for this
+    # job. Examine the state of that request and make sure we can
+    # replace it.
+    elif job['execution_state'] == 'spot-instance-requested':
+        spot_instance_requests = ec2.describe_spot_instance_requests(
+            SpotInstanceRequestIds=[job['spot_instance_request_id']],
+        )['SpotInstanceRequests']
+
+        # This should never happen assuming this code path doesn't run after
+        # the spot request was finalized, which should never happen, since
+        # we trigger things periodically.
+        if not spot_instance_requests:
+            print('unable to find spot instance request for job %s' % job_id)
+            return
+
+        sir = spot_instance_requests[0]
+
+        # If the spot request expired, try again on to the next availability
+        # zone.
+        if (sir['State'] == 'cancelled'
+            and sir['Status']['Code'] == 'schedule-expired'):
+            print('previous spot request for job %s expired; '
+                  'trying different az' % job_id)
+
+            previous_az = sir['LaunchSpecification']['Placement']['AvailabilityZone']
+            previous_index = availability_zones.index(previous_az)
+            try:
+                availability_zone = availability_zones[previous_index + 1]
+            except IndexError:
+                availability_zone = availability_zones[0]
+        else:
+            print('unhandled spot instance request state for job %s: '
+                  '%s; %s: %s' % (
+                job_id,
+                sir['State'],
+                sir['Status']['Code'],
+                sir['Status']['Message']))
+            print('cancelling job %s' % job_id)
+            job_table.update_item(
+                Key={'job_id': job_id},
+                UpdateExpression=(
+                    'set execution_state = :state, '
+                    'cancelled_reason = :reason'
+                ),
+                ExpressionAttributeValues={
+                    ':state': 'cancelled',
+                    ':reason': 'spot state: %s; %s' % (
+                        sir['State'],
+                        sir['Status']['Code'],
+                    ),
+                },
+            )
+            return
+
     else:
         print('unhandled execution_state: %s' % job['execution_state'])
         return
@@ -485,10 +567,13 @@
     # we can just retry the job.
     #
     # The max bid price is the on-demand price. So in the typical case we save
-    # $$$. If we're unlucky we pay the on-demand rate. You can't lose.
+    # $$$. If we're unlucky we pay the on-demand rate. You can't lose. Unless
+    # there are no available spot instances. But we handle this by setting a
+    # short request validity window and retrying in a different availability
+    # zone. Eventually we should find someone willing to satisfy our request.
     res = ec2.request_spot_instances(
         BlockDurationMinutes=60,
-        ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=10),
+        ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=1),
         LaunchSpecification=launch_spec,
     )
 
@@ -511,6 +596,55 @@
     )
 
 
+def monitor_spot_instance_requests(ec2, job_table, lambda_client,
+                                   start_job_function):
+    """Looks for stale spot instance requests and updates state accordingly."""
+    print('monitoring for stale spot instance requests')
+
+    # We could call the EC2 API directly. But we want DynamoDB to be our
+    # source of truth. So key off its state.
+    res = job_table.query(
+        IndexName='execution_state',
+        KeyConditionExpression=Key('execution_state').eq('spot-instance-requested'),
+        Select='ALL_PROJECTED_ATTRIBUTES',
+    )
+    print('found %d jobs in spot-instance-requested state' % len(res['Items']))
+
+    for item in res['Items']:
+        print('job %s is in spot-instance-requested' % item['job_id'])
+
+        # If the spot instance request is expired, retrigger scheduling.
+        request_id = item['spot_instance_request_id']
+        print('checking state of %s' % request_id)
+        spot_instance_requests = ec2.describe_spot_instance_requests(
+            SpotInstanceRequestIds=[request_id],
+        )['SpotInstanceRequests']
+
+        if not spot_instance_requests:
+            print('could not find %s; weird' % request_id)
+            continue
+
+        sir = spot_instance_requests[0]
+
+        print('spot instance request %s is in state %s: %s' % (
+            request_id, sir['State'], sir['Status']['Code']))
+
+        if (sir['State'] == 'cancelled'
+            and sir['Status']['Code'] == 'schedule-expired'):
+            print('spot instance request %s for job %s has expired; '
+                  'retrying scheduling' % (request_id, item['job_id']))
+
+            payload = json.dumps({
+                'job_id': item['job_id'],
+            })
+
+            lambda_client.invoke(
+                FunctionName=start_job_function,
+                InvocationType='Event',
+                Payload=payload,
+            )
+
+
 def react_to_instance_state_change(job_table, instance, state):
     """React to a CI worker instance state change."""
     now = decimal.Decimal(time.time())
diff --git a/contrib/ci/README.rst b/contrib/ci/README.rst
--- a/contrib/ci/README.rst
+++ b/contrib/ci/README.rst
@@ -139,8 +139,36 @@
 This function will update job records in DynamoDB to record that a
 job has started/finished/aborted.
 
+This component contains a *start job* Lambda function, which can be
+invoked with the Job ID of a job that someone wishes to start. It
+tries to ensure the job has run. In the case of an expired spot instance
+request, it will try again.
+
 The Terraform code for this component lives in ``job_executor.tf``.
 
+Spot Instance Request Monitor
+-----------------------------
+
+The *spot instance request monitor* is a component for monitoring the
+state of spot instance requests.
+
+We utilize EC2 spot instances to run jobs. Spot instances, unlike
+on-demand instances, can't be launched directly. Instead, you create
+a request for spot instances and this request is eventually fulfilled
+by EC2, if possible. Often, the request is fulfilled immediately and
+an EC2 instance launches within a few seconds.
+
+But sometimes a spot instance request fails. This is often due to no
+spot instances being available at that time. This is where this component
+plays a part.
+
+The *spot instance request monitor* is a Lambda function that is
+periodically invoked via a CloudWatch Event. It scans all jobs currently
+waiting on a spot instance request. If the spot instance request
+couldn't be fulfilled, it calls out to the *start job* Lambda function
+to tell it to try to reschedule it. Theoretically, the job should
+eventually start.
+
 Worker
 ------
 



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list