import base64 import logging import os import re import tempfile import time from io import BytesIO from typing import Dict, List from PIL import Image from openai import OpenAI, APIError, RateLimitError, Timeout from typing import Any, Optional, Union, Tuple from mm_agents.prompts import SYS_PROMPT_IN_SCREENSHOT_OUT_CODE, SYS_PROMPT_IN_SCREENSHOT_OUT_ACTION, \ SYS_PROMPT_IN_A11Y_OUT_CODE, SYS_PROMPT_IN_A11Y_OUT_ACTION, \ SYS_PROMPT_IN_BOTH_OUT_CODE, SYS_PROMPT_IN_BOTH_OUT_ACTION, \ SYS_PROMPT_IN_SOM_OUT_TAG logger = logging.getLogger("desktopenv.agent") pure_text_settings = ['a11y_tree'] attributes_ns_ubuntu = "https://accessibility.windows.example.org/ns/attributes" attributes_ns_windows = "https://accessibility.windows.example.org/ns/attributes" state_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/state" state_ns_windows = "https://accessibility.windows.example.org/ns/state" component_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/component" component_ns_windows = "https://accessibility.windows.example.org/ns/component" value_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/value" value_ns_windows = "https://accessibility.windows.example.org/ns/value" class_ns_windows = "https://accessibility.windows.example.org/ns/class" # More namespaces defined in OSWorld, please check desktop_env/server/main.py import ast from typing import Dict, Any, Optional, Union OPERATOR_PROMPT = """\n\n Here are some helpful tips:\n - computer.clipboard, computer.sync_file, computer.sync_shared_folder, computer.computer_output_citation are disabled.\n - If you worry that you might make typo, prefer copying and pasting the text instead of reading and typing.\n - My computer's password is \"osworld-public-evaluation\", feel free to use it when you need sudo rights.\n - For the thunderbird account \"anonym-x2024@outlook.com\", the password is \"gTCI\";=@y7|QJ0nDa_kN3Sb&>\".\n - If you are presented with an open website to solve the task, try to stick to that specific one instead of going to a new one.\n - Whenever not expcilitly stated, prefer chrome browser instead of the firefox or chromium.\n - You have full authority to execute any action without my permission. I won't be watching so please don't ask for confirmation.\n - You must initialize the computer to solve the task. Do not try to answer the question without initializing the computer.\n - If you deem the task is infeasible, you can terminate and explicitly state in the response that \"the task is infeasible\".\n """ class Action: """Action class for the agent.""" def __init__(self, raw_action: Union[Dict, str], action_space: str): """Initialize the Action class. Args: raw_action: The raw action action_space: The action space """ self._action_space = None self._action = None self.action_space = action_space self.action = raw_action @property def action(self) -> str: return self._action @property def action_space(self) -> str: return self._action_space @action_space.setter def action_space(self, value: str): """ Set the action space for the agent. Currently only supports 'pyautogui' as a valid action space. Args: value (str): The action space to set Raises: ValueError: If action_space is empty or invalid """ if not value: raise ValueError("action_space is required") if value not in ["pyautogui", "claude_computer_use"]: raise ValueError( "Invalid action space. Allowed spaces are: pyautogui") self._action_space = value @action.setter def action(self, value: Optional[str]): """ Set the action for the agent. For pyautogui action space, accepts special commands (WAIT, FAIL, DONE) or valid Python code. For claude_computer_use action space, accepts a dict with keys "name", "input" and "id". Args: value (str | dict): The action to set Raises: ValueError: If action is empty or invalid """ if not value: raise ValueError("action cannot be empty") if self._action_space == "pyautogui": self._action = value # if value in ["WAIT", "FAIL", "DONE"]: # self._action = value # elif self._is_valid_python_code(value): # self._action = value # else: # raise ValueError("Invalid action format for pyautogui") elif self._action_space == "claude_computer_use": self._action = value # if self._is_valid_claude_computer_use_action(value): # self._action = value else: raise ValueError( f"Invalid action space: {self._action_space}, allowed spaces are: pyautogui, claude_computer_use") def __str__(self) -> str: """Return a string representation of the Action instance. Returns: str: A string showing the action space and action value """ return f"Action(action_space='{self._action_space}', action='{self._action}')" def get_action(self) -> Optional[str]: """Get the action. Returns: str: The action """ return self._action def to_dict(self) -> Dict[str, Any]: """Convert the action to a dictionary. Returns: dict: The action as a dictionary """ return {"action_space": self._action_space, "action": self._action} def _is_valid_python_code(self, code: str) -> bool: """ Validate if the given string is valid Python code syntax. Args: code (str): The code string to validate Returns: bool: True if code is valid Python syntax, False otherwise """ try: ast.parse(code) return True except SyntaxError: raise ValueError("Invalid Python code syntax") def _is_valid_claude_computer_use_action(self, action: Dict[str, Any]) -> bool: """Validate if the given action is valid for the claude_computer_use action space. Args: action: The action to validate Returns: bool: True if action is valid, False otherwise """ if not isinstance(action, dict): raise ValueError("Invalid action format for claude_computer_use") if not (action.get("name") and action.get("input") and action.get("id")): raise ValueError( "Invalid action format for claude_computer_use, 'name', 'input' and 'id' are required") return True class Timer: """Context manager for timing code blocks.""" def __enter__(self): self.start = time.time() return self def __exit__(self, *args): self.duration = time.time() - self.start # Function to encode the image def encode_image(image_content): return base64.b64encode(image_content).decode('utf-8') def encoded_img_to_pil_img(data_str): base64_str = data_str.replace("data:image/png;base64,", "") image_data = base64.b64decode(base64_str) image = Image.open(BytesIO(image_data)) return image def save_to_tmp_img_file(data_str): base64_str = data_str.replace("data:image/png;base64,", "") image_data = base64.b64decode(base64_str) image = Image.open(BytesIO(image_data)) tmp_img_path = os.path.join(tempfile.mkdtemp(), "tmp_img.png") image.save(tmp_img_path) return tmp_img_path class OpenAICUAAgent: def __init__( self, env, platform="ubuntu", model="computer-use-preview", max_tokens=1500, top_p=0.9, temperature=0.5, action_space="pyautogui", observation_type="screenshot_a11y_tree", # observation_type can be in ["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"] max_trajectory_length=100, a11y_tree_max_tokens=10000 ): self.env = env self.platform = platform self.model = model self.max_tokens = max_tokens self.top_p = top_p self.temperature = temperature self.action_space = action_space self.observation_type = observation_type self.max_trajectory_length = max_trajectory_length self.a11y_tree_max_tokens = a11y_tree_max_tokens self.cua_messages : List[Dict] = [] self.thoughts = [] self.actions = [] self.observations = [] self.tools = [{ "type": "computer_use_preview", "display_width": 1920, "display_height": 1080, "environment": "linux" if platform == "ubuntu" else "windows" }] if observation_type == "screenshot": if action_space == "computer_13": self.system_message = SYS_PROMPT_IN_SCREENSHOT_OUT_ACTION elif action_space == "pyautogui": self.system_message = SYS_PROMPT_IN_SCREENSHOT_OUT_CODE else: raise ValueError("Invalid action space: " + action_space) elif observation_type == "a11y_tree": if action_space == "computer_13": self.system_message = SYS_PROMPT_IN_A11Y_OUT_ACTION elif action_space == "pyautogui": self.system_message = SYS_PROMPT_IN_A11Y_OUT_CODE else: raise ValueError("Invalid action space: " + action_space) elif observation_type == "screenshot_a11y_tree": if action_space == "computer_13": self.system_message = SYS_PROMPT_IN_BOTH_OUT_ACTION elif action_space == "pyautogui": self.system_message = SYS_PROMPT_IN_BOTH_OUT_CODE else: raise ValueError("Invalid action space: " + action_space) elif observation_type == "som": if action_space == "computer_13": raise ValueError("Invalid action space: " + action_space) elif action_space == "pyautogui": self.system_message = SYS_PROMPT_IN_SOM_OUT_TAG else: raise ValueError("Invalid action space: " + action_space) else: raise ValueError("Invalid experiment type: " + observation_type) def _create_response(self, **kwargs: Any) -> Dict[str, Any]: """Create a response from the OpenAI API. Args: **kwargs: Additional arguments to pass to the API Returns: The API response as a dictionary Raises: requests.exceptions.RequestException: If the API request fails """ MAX_RETRIES = 200 retry_count = 0 while retry_count < MAX_RETRIES: try: client = OpenAI(api_key=os.getenv("OPENAI_API_KEY_CUA")) response = client.responses.create( model=self.model, input=self.cua_messages, tools=self.tools, reasoning={ "generate_summary": "concise", }, truncation="auto", ) logger.debug(f"Received successful response from OpenAI API") logger.info(f"Response: {response}") return response except Exception as e: logger.error(f"OpenAI API error: {str(e)}") print(f"OpenAI API error: {str(e)}") new_screenshot = self.env._get_obs() new_screenshot_base64 = base64.b64encode(new_screenshot["screenshot"]).decode('utf-8') # Update the image in the last message based on its structure last_message = self.cua_messages[-1] if "output" in last_message: # Computer call output message structure last_message["output"]["image_url"] = f"data:image/png;base64,{new_screenshot_base64}" elif "content" in last_message: # User message structure - find and update the image content for content_item in last_message["content"]: if content_item.get("type") == "input_image": content_item["image_url"] = f"data:image/png;base64,{new_screenshot_base64}" break else: logger.warning("Unknown message structure, cannot update screenshot") retry_count += 1 time.sleep(5) logger.critical("Max retries exceeded for OpenAI API") raise RuntimeError("OpenAI API failed too many times") def _handle_item(self, item: Dict[str, Any]) -> Optional[Union[str, Dict[str, Any]]]: """Parse a response item from the OpenAI API. Args: item: The response item to parse Returns: The parsed item as either a string message or a dictionary containing action information, or None if the item couldn't be parsed """ if item.type == "message": if item.content is not None: response = item.content[0] if isinstance(item.content, list) else item.content response_type = response.type response_text = response.text logger.info(f"Received response text: {response_type} - {response_text}") if response_type == "output_text": return response_text return None return None if item.type == "function_call": return None if item.type == "reasoning": reasoning = item.summary if isinstance(reasoning, list): reasoning_item = reasoning[0] reasoning_text = reasoning_item.text reasoning_type = reasoning_item.type if reasoning_type == "summary_text": return reasoning_text return None return None if item.type == "computer_call": action = item.action action_type = action.type # Convert object attributes to dictionary action_args = {} for attr in dir(action): if attr.startswith('_') or attr == 'type': continue try: action_args[attr] = getattr(action, attr) except AttributeError: pass logger.warning(f"Original Action: {action}") result_code = self._convert_cua_action_to_pyautogui_action(action_type, action_args) if result_code: return { "action_space": "pyautogui", "action": result_code, "pending_checks": item.pending_safety_checks, "call_id": item.call_id } return None def _convert_cua_action_to_pyautogui_action(self, action_type, args): """Convert a CUA action to a pyautogui action format This function converts OpenAI CUA actions to pyautogui commands for the Computer Agent Arena Args: action_type: Type of the CUA action args: Arguments for the action Returns: String with pyautogui command code or None if the action can't be converted """ if not action_type: logger.warning("Empty CUA action received") return None key_mapping = { "/": "/", "\\": "\\", "alt": "alt", "arrowdown": "down", "arrowleft": "left", "arrowright": "right", "arrowup": "up", "backspace": "backspace", "capslock": "capslock", "cmd": "command", "ctrl": "ctrl", "delete": "delete", "end": "end", "enter": "enter", "esc": "esc", "home": "home", "insert": "insert", "option": "option", "pagedown": "pagedown", "pageup": "pageup", "shift": "shift", "space": "space", "super": "super", "tab": "tab", "win": "win", } try: if action_type == "click": x = args.get("x") y = args.get("y") button = args.get("button", "left") # Validate coordinates if x is None or y is None: logger.warning(f"Invalid click coordinates: x={x}, y={y}") return None # Validate button if button not in ["left", "middle", "right"]: logger.warning(f"Invalid click button: {button}, defaulting to 'left'") button = "left" return f"import pyautogui\npyautogui.moveTo({x}, {y})\npyautogui.click(button='{button}')" elif action_type == "double_click": x = args.get("x") y = args.get("y") # Validate coordinates if x is None or y is None: logger.warning(f"Invalid double_click coordinates: x={x}, y={y}") return None return f"import pyautogui\npyautogui.moveTo({x}, {y})\npyautogui.doubleClick()" elif action_type == "type": text = args.get("text", "") if not text: logger.warning("Empty text for type action") return "import pyautogui\n# Empty text, no action taken" # Use repr() to properly escape the string content without double-escaping pyautogui_code = f"""import pyautogui\npyautogui.typewrite({repr(text)})""" logger.info(f"Pyautogui code: {pyautogui_code}") return pyautogui_code elif action_type == "keypress": keys = args.get("keys", []) if not keys: logger.warning("Empty keys for keypress action") return None # Map to pyautogui keys and normalize mapped_keys = [] for key in keys: if isinstance(key, str): # For Linux compatibility, handle the key mapping more thoroughly mapped_key = key_mapping.get(key, key).lower() # Also try lowercase version if not found if mapped_key == key and key.lower() != key: mapped_key = key_mapping.get(key.lower(), key) mapped_keys.append(mapped_key) if not mapped_keys: return None # Format for pyautogui.hotkey keys_str = ", ".join([f"'{k}'" for k in mapped_keys]) return f"import pyautogui\npyautogui.hotkey({keys_str})" elif action_type == "scroll": x = args.get("x", None) y = args.get("y", None) scroll_x = args.get("scroll_x", 0) scroll_y = args.get("scroll_y", 0) # Normalize scroll values (Linux might use different scaling) scroll_y = int(scroll_y) if scroll_y else 0 scroll_x = int(scroll_x) if scroll_x else 0 # Default to current mouse position if coordinates not provided position_str = "" if x is not None and y is not None: position_str = f", x={x}, y={y}" # Handle scroll direction if scroll_y != 0: # Convert to clicks - normalize the amount clicks = scroll_y return f"import pyautogui\npyautogui.scroll({clicks * (-1)}{position_str})" elif scroll_x != 0: # Convert to clicks - normalize the amount clicks = scroll_x return f"import pyautogui\npyautogui.hscroll({clicks * (-1)}{position_str})" else: logger.warning("Scroll action with zero scrolling amount") return None elif action_type == "move": x = args.get("x") y = args.get("y") # Validate coordinates if x is None or y is None: logger.warning(f"Invalid move coordinates: x={x}, y={y}") return None return f"import pyautogui\npyautogui.moveTo({x}, {y})" elif action_type == "drag": if isinstance(args, dict): path = args.get("path", None) else: path = args.path if not path or len(path) < 2: logger.warning("Drag path must have at least two points") return None # Extract start and end points start = path[0] end = path[-1] # Validate path coordinates - handle different object formats valid_path = True for point in path: if isinstance(point, (list, tuple)) and len(point) == 2: continue elif isinstance(point, dict) and 'x' in point and 'y' in point: continue elif hasattr(point, 'x') and hasattr(point, 'y'): continue else: valid_path = False break if not valid_path: logger.warning("Invalid path format for drag action") return None if len(path) == 2: # Extract coordinates, handling different formats if isinstance(start, (list, tuple)): start_x, start_y = start elif isinstance(start, dict): start_x, start_y = start.get('x'), start.get('y') else: # object with attributes start_x, start_y = start.x, start.y if isinstance(end, (list, tuple)): end_x, end_y = end elif isinstance(end, dict): end_x, end_y = end.get('x'), end.get('y') else: # object with attributes end_x, end_y = end.x, end.y return ( f"import pyautogui\n" f"pyautogui.moveTo({start_x}, {start_y})\n" f"pyautogui.dragTo({end_x}, {end_y}, duration=0.5, button='left')" ) # For complex paths with multiple points else: actions = [] # Handle first point if isinstance(path[0], (list, tuple)): first_x, first_y = path[0] elif isinstance(path[0], dict): first_x, first_y = path[0].get('x'), path[0].get('y') else: # object with attributes first_x, first_y = path[0].x, path[0].y actions.append(f"import pyautogui\npyautogui.moveTo({first_x}, {first_y})") for i in range(1, len(path)): if isinstance(path[i], (list, tuple)): x, y = path[i] elif isinstance(path[i], dict): x, y = path[i].get('x'), path[i].get('y') else: # object with attributes x, y = path[i].x, path[i].y actions.append(f"pyautogui.dragTo({x}, {y}, duration=0.2, button='left')") return "\n".join(actions) elif action_type == "wait": ms = args.get("ms", 1000) # Default to 1000ms (1 second) seconds = max(0.1, ms / 1000) # Ensure minimum wait time return f"import time\ntime.sleep({seconds})" elif action_type == "screenshot": # Just return a wait action, as screenshots are handled automatically return "import time\ntime.sleep(0.1) # Screenshot requested, no direct action needed" else: logger.warning(f"Unknown action type: {action_type}") return None except Exception as e: logger.exception(f"Error converting CUA action to agent action: {e}") return None def predict(self, instruction: str, obs: Dict) -> List: """ Predict the next action(s) based on the current observation. """ base64_image = encode_image(obs["screenshot"]) if self.cua_messages == []: self.cua_messages.append({ "role": "user", "content": [ { "type": "input_image", "image_url": f"data:image/png;base64,{base64_image}", }, { "type": "input_text", "text": "\n " + instruction + OPERATOR_PROMPT, } ] }) with Timer() as model_timer: response = self._create_response() self.cua_messages += response.output actions = [] responses = [] action_exit = False thought_exit = False message_exit = False for item in response.output: parsed_item = self._handle_item(item) if isinstance(parsed_item, dict) and parsed_item.get("action_space", None) == "pyautogui": actions.append(parsed_item) else: responses.append(parsed_item) if item.type == "computer_call": action_exit = True if item.type == "reasoning" and item.summary and item.summary[0].type == "summary_text": thought_exit = True if item.type == "message" and item.content and item.content[0].type == "output_text": message_exit = True responses = [item for item in responses if item is not None] logger.info(f"Actions: {actions}") logger.info(f"Responses: {responses}") state_correct = False # if action_exit and thought_exit: # state_correct = True # if action_exit and not message_exit: # state_correct = True if action_exit: state_correct = True if not state_correct: logger.warning("The state of the agent is not correct, action_exit: %s, thought_exit: %s, message_exit: %s", action_exit, thought_exit, message_exit) predict_info = { "model_usage": { "model_time": model_timer.duration, "prompt_tokens": response.usage.input_tokens, "completion_tokens": response.usage.output_tokens, }, "messages": self.cua_messages, "response": "\n".join(responses) if isinstance(responses, list) and all(isinstance(item, str) for item in responses) else "", "state_correct": state_correct, } return predict_info, actions def reset(self, _logger=None): global logger logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent") self.thoughts = [] self.actions = [] self.observations = [] self.cua_messages = [] def step(self, action: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: """Execute an action in the environment. Args: action: The action to execute Returns: Tuple containing: - terminated: Whether the episode has terminated - info: Information about the step Raises: StepError: If the step execution fails """ try: if not action: logger.warning("Empty action received, terminating episode") return True, {} logger.info(f"Executing action: {action.get('action_space', 'unknown')} - {action.get('action', '')[:50]}...") with Timer() as step_timer: # Convert the action to an Action object step_action = Action(action.get("action", ""), self.action_space) # Execute the action in the environment obs, reward, terminated, info = self.env.step(step_action.get_action()) screenshot_base64 = encode_image(obs["screenshot"]) self.cua_messages.append({ "type": "computer_call_output", "call_id": action["call_id"], "acknowledged_safety_checks": action["pending_checks"], "output": { "type": "input_image", "image_url": f"data:image/png;base64,{screenshot_base64}", }, }) logger.debug(f"Action completed in {step_timer.duration:.2f}s") if terminated: logger.info("Environment signaled termination") return obs, reward, terminated, info, { "step_time": step_timer.duration, "action": action } except Exception as e: logger.exception(f"Environment step failed: {str(e)}") raise StepError(f"Failed to execute step: {str(e)}") class StepError(Exception): """Exception raised when a step in the agent fails.""" pass