mirrored 20 minutes ago
0
HappySixChange resolution before saving snapshot in VirtualBox (#47) * Initailize aws support * Add README for the VM server * Refactor OSWorld for supporting more cloud services. * Initialize vmware and aws implementation v1, waiting for verification * Initlize files for azure, gcp and virtualbox support * Debug on the VMware provider * Fix on aws interface mapping * Fix instance type * Refactor * Clean * Add Azure provider * hk region; debug * Fix lock * Remove print * Remove key_name requirements when allocating aws vm * Clean README * Fix reset * Fix bugs * Add VirtualBox and Azure providers * Add VirtualBox OVF link * Raise exception on macOS host * Init RAEDME for VBox * Update VirtualBox VM download link * Update requirements and setup.py; Improve robustness on Windows * Fix network adapter * Go through on Windows machine * Add default adapter option * Fix minor error * Change resolution before creating snapshot * Fix small error * Change default provider option --------- Co-authored-by: Timothyxxx <384084775@qq.com> Co-authored-by: XinyuanWangCS <xywang626@gmail.com> Co-authored-by: Tianbao Xie <47296835+Timothyxxx@users.noreply.github.com>16c3def
import logging
import platform
import subprocess
import time
import os
from desktop_env.providers.base import Provider
import xml.etree.ElementTree as ET

logger = logging.getLogger("desktopenv.providers.virtualbox.VirtualBoxProvider")
logger.setLevel(logging.INFO)

WAIT_TIME = 3

# Note: Windows will not add command VBoxManage to PATH by default. Please add the folder where VBoxManage executable is in (Default should be "C:\Program Files\Oracle\VirtualBox" for Windows) to PATH.

class VirtualBoxProvider(Provider):
    @staticmethod
    def _execute_command(command: list):
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=60, text=True,
                                encoding="utf-8")
        if result.returncode != 0:
            raise Exception("\033[91m" + result.stdout + result.stderr + "\033[0m")
        return result.stdout.strip()
    
    @staticmethod
    def _get_vm_uuid(path_to_vm: str):
        try:
            output = subprocess.check_output(f"VBoxManage list vms", shell=True, stderr=subprocess.STDOUT)
            output = output.decode()
            output = output.splitlines()
            if path_to_vm.endswith('.vbox'):
                # Load and parse the XML content from the file
                tree = ET.parse(path_to_vm)
                root = tree.getroot()
                
                # Find the <Machine> element and retrieve its 'uuid' attribute
                machine_element = root.find('.//{http://www.virtualbox.org/}Machine')
                if machine_element is not None:
                    uuid = machine_element.get('uuid')[1:-1]
                    return uuid
                else:
                    logger.error(f"UUID not found in file {path_to_vm}")
                    raise
            elif any(line.split()[1] == "{" + path_to_vm + "}" for line in output):
                logger.info(f"Got valid UUID {path_to_vm}.")
                return path_to_vm
            else:
                for line in output:
                    if line.split()[0] == '"' + path_to_vm + '"':
                        uuid = line.split()[1][1:-1]
                        return uuid
                logger.error(f"The path you provided does not match any of the \".vbox\" file, name, or UUID of VM.")
                raise
        except subprocess.CalledProcessError as e:
                logger.error(f"Error executing command: {e.output.decode().strip()}")
            

    def start_emulator(self, path_to_vm: str, headless: bool):
        logger.info("Starting VirtualBox VM...")

        while True:
            try:
                uuid = VirtualBoxProvider._get_vm_uuid(path_to_vm)
                output = subprocess.check_output(f"VBoxManage list runningvms", shell=True, stderr=subprocess.STDOUT)
                output = output.decode()
                output = output.splitlines()

                if any(line.split()[1] == "{" + uuid + "}" for line in output):
                    logger.info("VM is running.")
                    break
                else:
                    logger.info("Starting VM...")
                    VirtualBoxProvider._execute_command(["VBoxManage", "startvm", uuid]) if not headless else \
                    VirtualBoxProvider._execute_command(
                            ["VBoxManage", "startvm", uuid, "--type", "headless"])
                    time.sleep(WAIT_TIME)

            except subprocess.CalledProcessError as e:
                logger.error(f"Error executing command: {e.output.decode().strip()}")

    def get_ip_address(self, path_to_vm: str) -> str:
        logger.info("Getting VirtualBox VM IP address...")
        while True:
            try:
                uuid = VirtualBoxProvider._get_vm_uuid(path_to_vm)
                output = VirtualBoxProvider._execute_command(
                    ["VBoxManage", "guestproperty", "get", uuid, "/VirtualBox/GuestInfo/Net/0/V4/IP"]
                )
                result = output.split()[1]
                if result != "value":
                    logger.info(f"VirtualBox VM IP address: {result}")
                    return result
                else:
                    logger.error("VM IP address not found. Have you installed the guest additions?")
                    raise
            except Exception as e:
                logger.error(e)
                time.sleep(WAIT_TIME)
                logger.info("Retrying to get VirtualBox VM IP address...")

    def save_state(self, path_to_vm: str, snapshot_name: str):
        logger.info("Saving VirtualBox VM state...")
        uuid = VirtualBoxProvider._get_vm_uuid(path_to_vm)
        VirtualBoxProvider._execute_command(["VBoxManage", "snapshot", uuid, "take", snapshot_name])
        time.sleep(WAIT_TIME)  # Wait for the VM to save

    def revert_to_snapshot(self, path_to_vm: str, snapshot_name: str):
        logger.info(f"Reverting VirtualBox VM to snapshot: {snapshot_name}...")
        uuid = VirtualBoxProvider._get_vm_uuid(path_to_vm)
        VirtualBoxProvider._execute_command(["VBoxManage", "controlvm", uuid, "savestate"])
        time.sleep(WAIT_TIME)  # Wait for the VM to stop
        VirtualBoxProvider._execute_command(["VBoxManage", "snapshot", uuid, "restore", snapshot_name])
        time.sleep(WAIT_TIME)  # Wait for the VM to revert
        return path_to_vm

    def stop_emulator(self, path_to_vm: str):
        logger.info("Stopping VirtualBox VM...")
        uuid = VirtualBoxProvider._get_vm_uuid(path_to_vm)
        VirtualBoxProvider._execute_command(["VBoxManage", "controlvm", uuid, "savestate"])
        time.sleep(WAIT_TIME)  # Wait for the VM to stop