/
OS-Worldcc6eddbimport os
import time
import logging
import volcenginesdkcore
import volcenginesdkautoscaling
import volcenginesdkecs.models as ecs_models
from volcenginesdkcore.rest import ApiException
from volcenginesdkecs.api import ECSApi
from desktop_env.providers.base import Provider
from desktop_env.providers.volcengine.manager import _allocate_vm
logger = logging.getLogger("desktopenv.providers.volcengine.VolcengineProvider")
logger.setLevel(logging.INFO)
WAIT_DELAY = 15
MAX_ATTEMPTS = 10
class VolcengineProvider(Provider):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.region = os.getenv("VOLCENGINE_REGION", "eu-central-1")
self.client = self._create_client()
def _create_client(self) -> ECSApi:
configuration = volcenginesdkcore.Configuration()
configuration.ak = os.getenv('VOLCENGINE_ACCESS_KEY_ID')
configuration.sk = os.getenv('VOLCENGINE_SECRET_ACCESS_KEY')
configuration.region = os.getenv('VOLCENGINE_REGION')
configuration.client_side_validation = True
# set default configuration
volcenginesdkcore.Configuration.set_default(configuration)
return ECSApi()
def start_emulator(self, path_to_vm: str, headless: bool, *args, **kwargs):
logger.info("Starting Volcengine VM...")
try:
# 检查实例状态
instance_info = self.client.describe_instances(ecs_models.DescribeInstancesRequest(
instance_ids=[path_to_vm]
))
status = instance_info.instances[0].status
logger.info(f"Instance {path_to_vm} current status: {status}")
if status == 'RUNNING':
logger.info(f"Instance {path_to_vm} is already running. Skipping start.")
return
if status == 'STOPPED':
# 启动实例
self.client.start_instance(ecs_models.StartInstancesRequest(instance_ids=[path_to_vm]))
logger.info(f"Instance {path_to_vm} is starting...")
# 等待实例运行
for attempt in range(MAX_ATTEMPTS):
time.sleep(WAIT_DELAY)
instance_info = self.client.describe_instances(ecs_models.DescribeInstancesRequest(
instance_ids=[path_to_vm]
))
status = instance_info.instances[0].status
if status == 'RUNNING':
logger.info(f"Instance {path_to_vm} is now running.")
break
elif status == 'ERROR':
raise Exception(f"Instance {path_to_vm} failed to start")
elif attempt == MAX_ATTEMPTS - 1:
raise Exception(f"Instance {path_to_vm} failed to start within timeout")
else:
logger.warning(f"Instance {path_to_vm} is in status '{status}' and cannot be started.")
except ApiException as e:
logger.error(f"Failed to start the Volcengine VM {path_to_vm}: {str(e)}")
raise
def get_ip_address(self, path_to_vm: str) -> str:
logger.info("Getting Volcengine VM IP address...")
try:
instance_info = self.client.describe_instances(ecs_models.DescribeInstancesRequest(
instance_ids=[path_to_vm]
))
public_ip = instance_info.instances[0].eip_address.ip_address
private_ip = instance_info.instances[0].network_interfaces[0].primary_ip_address
if public_ip:
vnc_url = f"http://{public_ip}:5910/vnc.html"
logger.info("=" * 80)
logger.info(f"🖥️ VNC Web Access URL: {vnc_url}")
logger.info(f"📡 Public IP: {public_ip}")
logger.info(f"🏠 Private IP: {private_ip}")
logger.info("=" * 80)
print(f"\n🌐 VNC Web Access URL: {vnc_url}")
print(f"📍 Please open the above address in the browser for remote desktop access\n")
else:
logger.warning("No public IP address available for VNC access")
return private_ip
except ApiException as e:
logger.error(f"Failed to retrieve IP address for the instance {path_to_vm}: {str(e)}")
raise
def save_state(self, path_to_vm: str, snapshot_name: str):
logger.info("Saving Volcengine VM state...")
try:
# 创建镜像
response = self.client.create_image(ecs_models.CreateImageRequest(
snapshot_id=snapshot_name,
instance_id=path_to_vm,
description=f"OSWorld snapshot: {snapshot_name}"
))
image_id = response['image_id']
logger.info(f"Image {image_id} created successfully from instance {path_to_vm}.")
return image_id
except ApiException as e:
logger.error(f"Failed to create image from the instance {path_to_vm}: {str(e)}")
raise
def revert_to_snapshot(self, path_to_vm: str, snapshot_name: str):
logger.info(f"Reverting Volcengine VM to snapshot: {snapshot_name}...")
try:
# 删除原实例
self.client.delete_instance(ecs_models.DeleteInstanceRequest(
instance_id=path_to_vm,
))
logger.info(f"Old instance {path_to_vm} has been deleted.")
# 创建实例
new_instance_id = _allocate_vm()
logger.info(f"New instance {new_instance_id} launched from image {snapshot_name}.")
logger.info(f"Waiting for instance {new_instance_id} to be running...")
# 等待新实例运行
while True:
instance_info = self.client.describe_instances(ecs_models.DescribeInstancesRequest(
instance_ids=[new_instance_id]
))
status = instance_info.instances[0].status
if status == 'RUNNING':
break
elif status in ['STOPPED', 'ERROR']:
raise Exception(f"New instance {new_instance_id} failed to start, status: {status}")
time.sleep(5)
logger.info(f"Instance {new_instance_id} is ready.")
# 获取新实例的IP地址
try:
instance_info = self.client.describe_instances(ecs_models.DescribeInstancesRequest(
instance_ids=[new_instance_id]
))
public_ip = instance_info.instances[0].eip_address.ip_address
if public_ip:
vnc_url = f"http://{public_ip}:5910/vnc.html"
logger.info("=" * 80)
logger.info(f"🖥️ New Instance VNC Web Access URL: {vnc_url}")
logger.info(f"📡 Public IP: {public_ip}")
logger.info(f"🆔 New Instance ID: {new_instance_id}")
logger.info("=" * 80)
print(f"\n🌐 New Instance VNC Web Access URL: {vnc_url}")
print(f"📍 Please open the above address in the browser for remote desktop access\n")
except Exception as e:
logger.warning(f"Failed to get VNC address for new instance {new_instance_id}: {e}")
return new_instance_id
except ApiException as e:
logger.error(f"Failed to revert to snapshot {snapshot_name} for the instance {path_to_vm}: {str(e)}")
raise
def stop_emulator(self, path_to_vm, region=None):
logger.info(f"Stopping Volcengine VM {path_to_vm}...")
try:
self.client.delete_instance(ecs_models.DeleteInstanceRequest(
instance_id=path_to_vm,
))
logger.info(f"Instance {path_to_vm} has been terminated.")
except ApiException as e:
logger.error(f"Failed to stop the Volcengine VM {path_to_vm}: {str(e)}")
raise