import os import time import json from datetime import datetime, timedelta, timezone import boto3 from botocore.exceptions import ClientError def _resolve_scheduler_role_arn(logger) -> str: # 1) Explicit env takes precedence role_arn = os.getenv('AWS_SCHEDULER_ROLE_ARN', '').strip() if role_arn: return role_arn # 2) Derive from role name + account id role_name = os.getenv('AWS_SCHEDULER_ROLE_NAME', 'osworld-scheduler-ec2-terminate').strip() try: sts = boto3.client('sts') account_id = sts.get_caller_identity()['Account'] derived_arn = f"arn:aws:iam::{account_id}:role/{role_name}" iam = boto3.client('iam') try: role = iam.get_role(RoleName=role_name)["Role"] except ClientError: auto_create = os.getenv('AWS_AUTO_CREATE_SCHEDULER_ROLE', 'true').lower() == 'true' if not auto_create: logger.warning(f"Scheduler role '{role_name}' not found and auto-create disabled.") return '' try: trust_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "scheduler.amazonaws.com"}, "Action": "sts:AssumeRole" } ] } iam.create_role(RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy)) role = iam.get_role(RoleName=role_name)["Role"] except ClientError as ce: # If another process created it, fetch again try: role = iam.get_role(RoleName=role_name)["Role"] except ClientError: logger.warning(f"Failed to auto-create scheduler role '{role_name}': {ce}") return '' # Ensure trust policy allows scheduler.amazonaws.com assume_doc = role.get("AssumeRolePolicyDocument", {}) principal_ok = False try: for stmt in assume_doc.get("Statement", []): principal = stmt.get("Principal", {}) svc = principal.get("Service") if isinstance(svc, str) and svc == "scheduler.amazonaws.com": principal_ok = True break if isinstance(svc, list) and "scheduler.amazonaws.com" in svc: principal_ok = True break except Exception: principal_ok = False if not principal_ok: trust_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "scheduler.amazonaws.com"}, "Action": "sts:AssumeRole" } ] } iam.update_assume_role_policy(RoleName=role_name, PolicyDocument=json.dumps(trust_policy)) # Ensure minimal inline policy exists inline_name = f"{role_name}-inline" need_policy = False try: iam.get_role_policy(RoleName=role_name, PolicyName=inline_name) except ClientError: need_policy = True if need_policy: inline_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["ec2:TerminateInstances", "ec2:DescribeInstances"], "Resource": "*" } ] } iam.put_role_policy(RoleName=role_name, PolicyName=inline_name, PolicyDocument=json.dumps(inline_policy)) # Wait for IAM propagation time.sleep(8) logger.info(f"Derived AWS_SCHEDULER_ROLE_ARN={derived_arn} from role name '{role_name}'") return derived_arn except Exception as e: logger.warning(f"Failed to resolve Scheduler Role ARN: {e}") return '' def schedule_instance_termination(region: str, instance_id: str, ttl_seconds: int, role_arn: str, logger) -> None: if not role_arn: role_arn = _resolve_scheduler_role_arn(logger) if not role_arn: logger.info("Scheduler role ARN not available; skipping TTL schedule creation.") return scheduler_client = boto3.client('scheduler', region_name=region) schedule_name = f"osworld-ttl-{instance_id}-{int(time.time())}" eta_scheduler = datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds) schedule_expression = f"at({eta_scheduler.strftime('%Y-%m-%dT%H:%M:%S')})" target_arn = "arn:aws:scheduler:::aws-sdk:ec2:terminateInstances" input_payload = '{"InstanceIds":["' + instance_id + '"]}' # Retry to tolerate IAM eventual consistency last_err = None for attempt in range(1, 7): # ~ up to ~60s try: scheduler_client.create_schedule( Name=schedule_name, ScheduleExpression=schedule_expression, FlexibleTimeWindow={"Mode": "OFF"}, ActionAfterCompletion='DELETE', Target={ "Arn": target_arn, "RoleArn": role_arn, "Input": input_payload }, State='ENABLED', Description=f"OSWorld TTL terminate for {instance_id}" ) logger.info(f"Scheduled EC2 termination via EventBridge Scheduler: name={schedule_name}, when={eta_scheduler.isoformat()} (UTC)") last_err = None break except ClientError as e: last_err = e code = e.response.get('Error', {}).get('Code') msg = e.response.get('Error', {}).get('Message', '') if code == 'ValidationException' and 'must allow AWS EventBridge Scheduler to assume the role' in msg: time.sleep(10) continue else: raise if last_err is not None: # If we exhausted retries, re-raise to surface warning upstream raise last_err