mirrored 14 minutes ago
0
yuanmengqiclean code 7315aec
import base64
import logging
import os
import re
import tempfile
import time
from io import BytesIO
from typing import Dict, List

from PIL import Image
from openai import OpenAI, APIError, RateLimitError, Timeout
from typing import Any, Optional, Union, Tuple

from mm_agents.prompts import SYS_PROMPT_IN_SCREENSHOT_OUT_CODE, SYS_PROMPT_IN_SCREENSHOT_OUT_ACTION, \
    SYS_PROMPT_IN_A11Y_OUT_CODE, SYS_PROMPT_IN_A11Y_OUT_ACTION, \
    SYS_PROMPT_IN_BOTH_OUT_CODE, SYS_PROMPT_IN_BOTH_OUT_ACTION, \
    SYS_PROMPT_IN_SOM_OUT_TAG

logger = logging.getLogger("desktopenv.agent")

pure_text_settings = ['a11y_tree']

attributes_ns_ubuntu = "https://accessibility.windows.example.org/ns/attributes"
attributes_ns_windows = "https://accessibility.windows.example.org/ns/attributes"
state_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/state"
state_ns_windows = "https://accessibility.windows.example.org/ns/state"
component_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/component"
component_ns_windows = "https://accessibility.windows.example.org/ns/component"
value_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/value"
value_ns_windows = "https://accessibility.windows.example.org/ns/value"
class_ns_windows = "https://accessibility.windows.example.org/ns/class"
# More namespaces defined in OSWorld, please check desktop_env/server/main.py
import ast
from typing import Dict, Any, Optional, Union

OPERATOR_PROMPT = """\n\n        Here are some helpful tips:\n        - computer.clipboard, computer.sync_file, computer.sync_shared_folder, computer.computer_output_citation are disabled.\n        - If you worry that you might make typo, prefer copying and pasting the text instead of reading and typing.\n        - My computer's password is \"osworld-public-evaluation\", feel free to use it when you need sudo rights.\n        - For the thunderbird account \"anonym-x2024@outlook.com\", the password is \"gTCI\";=@y7|QJ0nDa_kN3Sb&>\".\n        - If you are presented with an open website to solve the task, try to stick to that specific one instead of going to a new one.\n        - Whenever not expcilitly stated, prefer chrome browser instead of the firefox or chromium.\n        - You have full authority to execute any action without my permission. I won't be watching so please don't ask for confirmation.\n        - You must initialize the computer to solve the task. Do not try to answer the question without initializing the computer.\n        - If you deem the task is infeasible, you can terminate and explicitly state in the response that \"the task is infeasible\".\n    """

class Action:
    """Action class for the agent."""
    def __init__(self, raw_action: Union[Dict, str], action_space: str):
        """Initialize the Action class.

        Args:
            raw_action: The raw action
            action_space: The action space
        """
        self._action_space = None
        self._action = None
        self.action_space = action_space
        self.action = raw_action

    @property
    def action(self) -> str:
        return self._action

    @property
    def action_space(self) -> str:
        return self._action_space

    @action_space.setter
    def action_space(self, value: str):
        """
        Set the action space for the agent.
        Currently only supports 'pyautogui' as a valid action space.

        Args:
            value (str): The action space to set

        Raises:
            ValueError: If action_space is empty or invalid
        """
        if not value:
            raise ValueError("action_space is required")
        if value not in ["pyautogui", "claude_computer_use"]:
            raise ValueError(
                "Invalid action space. Allowed spaces are: pyautogui")
        self._action_space = value

    

    @action.setter
    def action(self, value: Optional[str]):
        """
        Set the action for the agent.
        For pyautogui action space, accepts special commands (WAIT, FAIL, DONE) or valid Python code.
        For claude_computer_use action space, accepts a dict with keys "name", "input" and "id".

        Args:
            value (str | dict): The action to set

        Raises:
            ValueError: If action is empty or invalid
        """
        if not value:
            raise ValueError("action cannot be empty")

        if self._action_space == "pyautogui":
            self._action = value
            # if value in ["WAIT", "FAIL", "DONE"]:
            #     self._action = value
            # elif self._is_valid_python_code(value):
            #     self._action = value
            # else:
            #     raise ValueError("Invalid action format for pyautogui")
        elif self._action_space == "claude_computer_use":
            self._action = value
            # if self._is_valid_claude_computer_use_action(value):
            #     self._action = value
        else:
            raise ValueError(
                f"Invalid action space: {self._action_space}, allowed spaces are: pyautogui, claude_computer_use")

    def __str__(self) -> str:
        """Return a string representation of the Action instance.

        Returns:
            str: A string showing the action space and action value
        """
        return f"Action(action_space='{self._action_space}', action='{self._action}')"

    def get_action(self) -> Optional[str]:
        """Get the action.

        Returns:
            str: The action
        """
        return self._action

    def to_dict(self) -> Dict[str, Any]:
        """Convert the action to a dictionary.

        Returns:
            dict: The action as a dictionary
        """
        return {"action_space": self._action_space, "action": self._action}

    def _is_valid_python_code(self, code: str) -> bool:
        """
        Validate if the given string is valid Python code syntax.

        Args:
            code (str): The code string to validate

        Returns:
            bool: True if code is valid Python syntax, False otherwise
        """
        try:
            ast.parse(code)
            return True
        except SyntaxError:
            raise ValueError("Invalid Python code syntax")

    def _is_valid_claude_computer_use_action(self, action: Dict[str, Any]) -> bool:
        """Validate if the given action is valid for the claude_computer_use action space.

        Args:
            action: The action to validate

        Returns:
            bool: True if action is valid, False otherwise
        """
        if not isinstance(action, dict):
            raise ValueError("Invalid action format for claude_computer_use")
        if not (action.get("name") and action.get("input") and action.get("id")):
            raise ValueError(
                "Invalid action format for claude_computer_use, 'name', 'input' and 'id' are required")
        return True

class Timer:
    """Context manager for timing code blocks."""
    
    def __enter__(self):
        self.start = time.time()
        return self
        
    def __exit__(self, *args):
        self.duration = time.time() - self.start

# Function to encode the image
def encode_image(image_content):
    return base64.b64encode(image_content).decode('utf-8')


def encoded_img_to_pil_img(data_str):
    base64_str = data_str.replace("data:image/png;base64,", "")
    image_data = base64.b64decode(base64_str)
    image = Image.open(BytesIO(image_data))

    return image


def save_to_tmp_img_file(data_str):
    base64_str = data_str.replace("data:image/png;base64,", "")
    image_data = base64.b64decode(base64_str)
    image = Image.open(BytesIO(image_data))

    tmp_img_path = os.path.join(tempfile.mkdtemp(), "tmp_img.png")
    image.save(tmp_img_path)

    return tmp_img_path


class OpenAICUAAgent:
    def __init__(
            self,
            env,
            platform="ubuntu",
            model="computer-use-preview",
            max_tokens=1500,
            top_p=0.9,
            temperature=0.5,
            action_space="pyautogui",
            observation_type="screenshot_a11y_tree",
            # observation_type can be in ["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"]
            max_trajectory_length=100,
            a11y_tree_max_tokens=10000
    ):
        self.env = env
        self.platform = platform
        self.model = model
        self.max_tokens = max_tokens
        self.top_p = top_p
        self.temperature = temperature
        self.action_space = action_space
        self.observation_type = observation_type
        self.max_trajectory_length = max_trajectory_length
        self.a11y_tree_max_tokens = a11y_tree_max_tokens
        self.cua_messages : List[Dict] = []

        self.thoughts = []
        self.actions = []
        self.observations = []

        self.tools = [{
            "type": "computer_use_preview",
            "display_width": 1920,
            "display_height": 1080,
            "environment": "linux" if platform == "ubuntu" else "windows"
        }]

        if observation_type == "screenshot":
            if action_space == "computer_13":
                self.system_message = SYS_PROMPT_IN_SCREENSHOT_OUT_ACTION
            elif action_space == "pyautogui":
                self.system_message = SYS_PROMPT_IN_SCREENSHOT_OUT_CODE
            else:
                raise ValueError("Invalid action space: " + action_space)
        elif observation_type == "a11y_tree":
            if action_space == "computer_13":
                self.system_message = SYS_PROMPT_IN_A11Y_OUT_ACTION
            elif action_space == "pyautogui":
                self.system_message = SYS_PROMPT_IN_A11Y_OUT_CODE
            else:
                raise ValueError("Invalid action space: " + action_space)
        elif observation_type == "screenshot_a11y_tree":
            if action_space == "computer_13":
                self.system_message = SYS_PROMPT_IN_BOTH_OUT_ACTION
            elif action_space == "pyautogui":
                self.system_message = SYS_PROMPT_IN_BOTH_OUT_CODE
            else:
                raise ValueError("Invalid action space: " + action_space)
        elif observation_type == "som":
            if action_space == "computer_13":
                raise ValueError("Invalid action space: " + action_space)
            elif action_space == "pyautogui":
                self.system_message = SYS_PROMPT_IN_SOM_OUT_TAG
            else:
                raise ValueError("Invalid action space: " + action_space)
        else:
            raise ValueError("Invalid experiment type: " + observation_type)

    def _create_response(self, **kwargs: Any) -> Dict[str, Any]:
        """Create a response from the OpenAI API.
        
        Args:
            **kwargs: Additional arguments to pass to the API
            
        Returns:
            The API response as a dictionary
            
        Raises:
            requests.exceptions.RequestException: If the API request fails
        """
        MAX_RETRIES = 200
        retry_count = 0
        while retry_count < MAX_RETRIES:
            try:
                client = OpenAI(api_key=os.getenv("OPENAI_API_KEY_CUA"))
                response = client.responses.create(
                    model=self.model,
                    input=self.cua_messages,
                    tools=self.tools,
                    reasoning={
                        "generate_summary": "concise",
                    },
                    truncation="auto",
                )
                logger.debug(f"Received successful response from OpenAI API")
                logger.info(f"Response: {response}")
                return response
            except Exception as e:
                logger.error(f"OpenAI API error: {str(e)}")
                print(f"OpenAI API error: {str(e)}")
                new_screenshot = self.env._get_obs()
                new_screenshot_base64 = base64.b64encode(new_screenshot["screenshot"]).decode('utf-8')
                
                # Update the image in the last message based on its structure
                last_message = self.cua_messages[-1]
                if "output" in last_message:
                    # Computer call output message structure
                    last_message["output"]["image_url"] = f"data:image/png;base64,{new_screenshot_base64}"
                elif "content" in last_message:
                    # User message structure - find and update the image content
                    for content_item in last_message["content"]:
                        if content_item.get("type") == "input_image":
                            content_item["image_url"] = f"data:image/png;base64,{new_screenshot_base64}"
                            break
                else:
                    logger.warning("Unknown message structure, cannot update screenshot")
                
                retry_count += 1
                time.sleep(5)
        logger.critical("Max retries exceeded for OpenAI API")
        raise RuntimeError("OpenAI API failed too many times")
    
    def _handle_item(self, item: Dict[str, Any]) -> Optional[Union[str, Dict[str, Any]]]:
        """Parse a response item from the OpenAI API.
        
        Args:
            item: The response item to parse
            
        Returns:
            The parsed item as either a string message or a dictionary containing action information,
            or None if the item couldn't be parsed
        """
        if item.type == "message":
            if item.content is not None:
                response = item.content[0] if isinstance(item.content, list) else item.content
                response_type = response.type
                response_text = response.text
                logger.info(f"Received response text: {response_type} - {response_text}")
                if response_type == "output_text":
                    return response_text
                return None
            return None
        
        if item.type == "function_call":
            return None
            
        if item.type == "reasoning":
            reasoning = item.summary
            if isinstance(reasoning, list):
                reasoning_item = reasoning[0]
                reasoning_text = reasoning_item.text
                reasoning_type = reasoning_item.type
                if reasoning_type == "summary_text":
                    return reasoning_text
                return None
            return None
            
        if item.type == "computer_call":
            action = item.action
            action_type = action.type
            # Convert object attributes to dictionary
            action_args = {}
            for attr in dir(action):
                if attr.startswith('_') or attr == 'type':
                    continue
                try:
                    action_args[attr] = getattr(action, attr)
                except AttributeError:
                    pass
            logger.warning(f"Original Action: {action}")
            result_code = self._convert_cua_action_to_pyautogui_action(action_type, action_args)
            if result_code:
                return {
                    "action_space": "pyautogui",
                    "action": result_code,
                    "pending_checks": item.pending_safety_checks,
                    "call_id": item.call_id
                }
            return None
    
    def _convert_cua_action_to_pyautogui_action(self, action_type, args):
        """Convert a CUA action to a pyautogui action format
        
        This function converts OpenAI CUA actions to pyautogui commands
        for the Computer Agent Arena
        
        Args:
            action_type: Type of the CUA action
            args: Arguments for the action
            
        Returns:
            String with pyautogui command code or None if the action can't be converted
        """
        if not action_type:
            logger.warning("Empty CUA action received")
            return None
        
        key_mapping = {
            "/": "/",
            "\\": "\\",
            "alt": "alt",
            "arrowdown": "down",
            "arrowleft": "left",
            "arrowright": "right",
            "arrowup": "up",
            "backspace": "backspace",
            "capslock": "capslock",
            "cmd": "command",
            "ctrl": "ctrl",
            "delete": "delete",
            "end": "end",
            "enter": "enter",
            "esc": "esc",
            "home": "home",
            "insert": "insert",
            "option": "option",
            "pagedown": "pagedown",
            "pageup": "pageup",
            "shift": "shift",
            "space": "space",
            "super": "super",
            "tab": "tab",
            "win": "win",
        }
        try:
            if action_type == "click":
                x = args.get("x")
                y = args.get("y")
                button = args.get("button", "left")
                
                # Validate coordinates
                if x is None or y is None:
                    logger.warning(f"Invalid click coordinates: x={x}, y={y}")
                    return None
                
                # Validate button
                if button not in ["left", "middle", "right"]:
                    logger.warning(f"Invalid click button: {button}, defaulting to 'left'")
                    button = "left"
                
                return f"import pyautogui\npyautogui.moveTo({x}, {y})\npyautogui.click(button='{button}')"
                
            elif action_type == "double_click":
                x = args.get("x")
                y = args.get("y")
                
                # Validate coordinates
                if x is None or y is None:
                    logger.warning(f"Invalid double_click coordinates: x={x}, y={y}")
                    return None
                
                return f"import pyautogui\npyautogui.moveTo({x}, {y})\npyautogui.doubleClick()"
                
            elif action_type == "type":
                text = args.get("text", "")
                
                if not text:
                    logger.warning("Empty text for type action")
                    return "import pyautogui\n# Empty text, no action taken"
                
                # Use repr() to properly escape the string content without double-escaping
                pyautogui_code = f"""import pyautogui\npyautogui.typewrite({repr(text)})"""
                logger.info(f"Pyautogui code: {pyautogui_code}")
                return pyautogui_code
                
            elif action_type == "keypress":
                keys = args.get("keys", [])
                
                if not keys:
                    logger.warning("Empty keys for keypress action")
                    return None
                
                # Map to pyautogui keys and normalize
                mapped_keys = []
                for key in keys:
                    if isinstance(key, str):
                        # For Linux compatibility, handle the key mapping more thoroughly
                        mapped_key = key_mapping.get(key, key).lower()
                        # Also try lowercase version if not found
                        if mapped_key == key and key.lower() != key:
                            mapped_key = key_mapping.get(key.lower(), key)
                        mapped_keys.append(mapped_key)
                
                if not mapped_keys:
                    return None
                
                # Format for pyautogui.hotkey
                keys_str = ", ".join([f"'{k}'" for k in mapped_keys])
                
                return f"import pyautogui\npyautogui.hotkey({keys_str})"
                
            elif action_type == "scroll":
                x = args.get("x", None)
                y = args.get("y", None)
                scroll_x = args.get("scroll_x", 0)
                scroll_y = args.get("scroll_y", 0)
                
                # Normalize scroll values (Linux might use different scaling)
                scroll_y = int(scroll_y) if scroll_y else 0
                scroll_x = int(scroll_x) if scroll_x else 0
                
                # Default to current mouse position if coordinates not provided
                position_str = ""
                if x is not None and y is not None:
                    position_str = f", x={x}, y={y}"
                
                # Handle scroll direction
                if scroll_y != 0:
                    # Convert to clicks - normalize the amount
                    clicks = scroll_y  
                    return f"import pyautogui\npyautogui.scroll({clicks * (-1)}{position_str})"
                elif scroll_x != 0:
                    # Convert to clicks - normalize the amount
                    clicks = scroll_x
                    return f"import pyautogui\npyautogui.hscroll({clicks * (-1)}{position_str})"
                else:
                    logger.warning("Scroll action with zero scrolling amount")
                    return None
                
            elif action_type == "move":
                x = args.get("x")
                y = args.get("y")
                
                # Validate coordinates
                if x is None or y is None:
                    logger.warning(f"Invalid move coordinates: x={x}, y={y}")
                    return None
                
                return f"import pyautogui\npyautogui.moveTo({x}, {y})"
                
            elif action_type == "drag":
                if isinstance(args, dict):
                    path = args.get("path", None)
                else:
                    path = args.path
                
                if not path or len(path) < 2:
                    logger.warning("Drag path must have at least two points")
                    return None
                
                # Extract start and end points
                start = path[0]
                end = path[-1]
                
                # Validate path coordinates - handle different object formats
                valid_path = True
                for point in path:
                    if isinstance(point, (list, tuple)) and len(point) == 2:
                        continue
                    elif isinstance(point, dict) and 'x' in point and 'y' in point:
                        continue
                    elif hasattr(point, 'x') and hasattr(point, 'y'):
                        continue
                    else:
                        valid_path = False
                        break
                
                if not valid_path:
                    logger.warning("Invalid path format for drag action")
                    return None
                
                if len(path) == 2:
                    # Extract coordinates, handling different formats
                    if isinstance(start, (list, tuple)):
                        start_x, start_y = start
                    elif isinstance(start, dict):
                        start_x, start_y = start.get('x'), start.get('y')
                    else:  # object with attributes
                        start_x, start_y = start.x, start.y
                        
                    if isinstance(end, (list, tuple)):
                        end_x, end_y = end
                    elif isinstance(end, dict):
                        end_x, end_y = end.get('x'), end.get('y')
                    else:  # object with attributes
                        end_x, end_y = end.x, end.y
                    
                    return (
                        f"import pyautogui\n"
                        f"pyautogui.moveTo({start_x}, {start_y})\n"
                        f"pyautogui.dragTo({end_x}, {end_y}, duration=0.5, button='left')"
                    )
                # For complex paths with multiple points
                else:
                    actions = []
                    # Handle first point
                    if isinstance(path[0], (list, tuple)):
                        first_x, first_y = path[0]
                    elif isinstance(path[0], dict):
                        first_x, first_y = path[0].get('x'), path[0].get('y')
                    else:  # object with attributes
                        first_x, first_y = path[0].x, path[0].y
                        
                    actions.append(f"import pyautogui\npyautogui.moveTo({first_x}, {first_y})")
                    
                    for i in range(1, len(path)):
                        if isinstance(path[i], (list, tuple)):
                            x, y = path[i]
                        elif isinstance(path[i], dict):
                            x, y = path[i].get('x'), path[i].get('y')
                        else:  # object with attributes
                            x, y = path[i].x, path[i].y
                            
                        actions.append(f"pyautogui.dragTo({x}, {y}, duration=0.2, button='left')")
                    
                    return "\n".join(actions)
                
            elif action_type == "wait":
                ms = args.get("ms", 1000)  # Default to 1000ms (1 second)
                seconds = max(0.1, ms / 1000)  # Ensure minimum wait time
                
                return f"import time\ntime.sleep({seconds})"
                
            elif action_type == "screenshot":
                # Just return a wait action, as screenshots are handled automatically
                return "import time\ntime.sleep(0.1)  # Screenshot requested, no direct action needed"
                
            else:
                logger.warning(f"Unknown action type: {action_type}")
                return None
                
        except Exception as e:
            logger.exception(f"Error converting CUA action to agent action: {e}")
            return None
    
    def predict(self, instruction: str, obs: Dict) -> List:
        """
        Predict the next action(s) based on the current observation.
        """
        
        base64_image = encode_image(obs["screenshot"])
        if self.cua_messages == []:
            self.cua_messages.append({
                "role": "user",
                "content": [
                    {
                        "type": "input_image",
                        "image_url": f"data:image/png;base64,{base64_image}",
                    },
                    {
                        "type": "input_text",
                        "text": "\n        " + instruction + OPERATOR_PROMPT,
                    }
                ]
            })

        with Timer() as model_timer:
            response = self._create_response()
        self.cua_messages += response.output
    
        actions = []
        responses = []
        action_exit = False
        thought_exit = False
        message_exit = False
        for item in response.output:
            parsed_item = self._handle_item(item)
            if isinstance(parsed_item, dict) and parsed_item.get("action_space", None) == "pyautogui":
                actions.append(parsed_item)
            else:
                responses.append(parsed_item)
            if item.type == "computer_call":
                action_exit = True
            if item.type == "reasoning" and item.summary and item.summary[0].type == "summary_text":
                thought_exit = True
            if item.type == "message" and item.content and item.content[0].type == "output_text":
                message_exit = True
        responses = [item for item in responses if item is not None]
        
        logger.info(f"Actions: {actions}")
        logger.info(f"Responses: {responses}")

        state_correct = False
        # if action_exit and thought_exit:
        #     state_correct = True
        # if action_exit and not message_exit:   
        #    state_correct = True
        if action_exit:
            state_correct = True
        if not state_correct:
            logger.warning("The state of the agent is not correct, action_exit: %s, thought_exit: %s, message_exit: %s", action_exit, thought_exit, message_exit)
            
        
        predict_info = {
            "model_usage": {
                "model_time": model_timer.duration,
                "prompt_tokens": response.usage.input_tokens,
                "completion_tokens": response.usage.output_tokens,
            },
            "messages": self.cua_messages,
            "response": "\n".join(responses) if isinstance(responses, list) and all(isinstance(item, str) for item in responses) else "",
            "state_correct": state_correct,
        }

        return predict_info, actions


    def reset(self, _logger=None):
        global logger
        logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent")

        self.thoughts = []
        self.actions = []
        self.observations = []
        self.cua_messages = []

    def step(self, action: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]:
        """Execute an action in the environment.
        
        Args:
            action: The action to execute
            
        Returns:
            Tuple containing:
                - terminated: Whether the episode has terminated
                - info: Information about the step
                
        Raises:
            StepError: If the step execution fails
        """
        try:
            if not action:
                logger.warning("Empty action received, terminating episode")
                return True, {}
                
            logger.info(f"Executing action: {action.get('action_space', 'unknown')} - {action.get('action', '')[:50]}...")
            
            with Timer() as step_timer:
                # Convert the action to an Action object
                step_action = Action(action.get("action", ""), self.action_space)
                # Execute the action in the environment
                obs, reward, terminated, info = self.env.step(step_action.get_action())
                
                screenshot_base64 = encode_image(obs["screenshot"])
                
                self.cua_messages.append({
                    "type": "computer_call_output",
                    "call_id": action["call_id"],
                    "acknowledged_safety_checks": action["pending_checks"],
                    "output": {
                        "type": "input_image",
                        "image_url": f"data:image/png;base64,{screenshot_base64}",
                    },
                })
                
            logger.debug(f"Action completed in {step_timer.duration:.2f}s")
            if terminated:
                logger.info("Environment signaled termination")
                
            return obs, reward, terminated, info, {
                "step_time": step_timer.duration,
                "action": action
            }
                
        except Exception as e:
            logger.exception(f"Environment step failed: {str(e)}")
            raise StepError(f"Failed to execute step: {str(e)}")
        
class StepError(Exception):
    """Exception raised when a step in the agent fails."""
    pass