""" OpenCUA Agent Implementation This module implements an OpenCUA agent for desktop automation tasks, building upon existing frameworks and integrating multiple coordinate mapping systems. Framework and Implementation Sources: - Main framework structure follows: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/agent.py - Agent implementation adapted from: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/aguvis_agent.py - Qwen2.5-VL coordinate mapping from: https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py """ import re import os import ast import time import math import httpx import base64 import backoff from loguru import logger from typing import Dict, List, Tuple, Optional # System prompts used in the training data AGNET_SYS_PROMPT_L1 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip() # AGNET_SYS_PROMPT_L2 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip() AGNET_SYS_PROMPT_L3 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nObservation:\n - Describe the current computer state based on the full screenshot in detail. \n - Application Context:\n - The active application\n - The active window or page\n - Overall layout and visible interface\n - Key Elements:\n - Menu items and toolbars \n - Buttons and controls\n - Text fields and content\n - Dialog boxes or popups\n - Error messages or notifications\n - Loading states\n - Other key elements\n - Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}\n".strip() # Testing prompt on OSWorld-Verified AGNET_SYS_PROMPT_L2 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task. The password of the computer is "osworld-public-evaluation". If the task is not possible to do, output the action computer.terminate(status='failure'). For each step, provide your response in this format: Thought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning Action:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize "—", maximize "□", close "X")\n - if the action involves keyboard actions like \'press\', \'write\', \'hotkey\':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions Finally, output the action as PyAutoGUI code or the following functions: - {"name": "computer.triple_click", "description": "Triple click on the screen", "parameters": {"type": "object", "properties": {"x": {"type": "number", "description": "The x coordinate of the triple click"}, "y": {"type": "number", "description": "The y coordinate of the triple click"}}, "required": ["x", "y"]}} - {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}}, "required": ["status"]}} """.strip() STEP_TEMPLATE = "# Step {step_num}:\n" INSTRUTION_TEMPLATE = "# Task Instruction:\n{instruction}\n\nPlease generate the next move according to the screenshot, task instruction and previous steps (if provided).\n" ACTION_HISTORY_TEMPLATE = "## Action:\n{action}\n" THOUGHT_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n" OBSERVATION_HISTORY_TEMPLATE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n" DETAIL_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n" def encode_image(image_content): """Encode the image to base64""" return base64.b64encode(image_content).decode('utf-8') def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type) -> Tuple[str, List[str], dict]: """Parse response including Observation, Thought, Action and code block""" try: sections = {} obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if obs_match: sections['observation'] = obs_match.group(1).strip() thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if thought_match: sections['thought'] = thought_match.group(1).strip() action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if action_match: action = action_match.group(1).strip() sections['action'] = action.strip() if "computer.terminate" in input_string.lower(): # Look for code blocks that might contain terminate command code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE) if code_blocks: last_code = code_blocks[-1].strip().lower() if "fail" in last_code: sections['code'] = "FAIL" return "FAIL", ["FAIL"], sections elif "success" in last_code: sections['code'] = "DONE" return "DONE", ["DONE"], sections # Default to DONE if terminate is mentioned but no specific status sections['code'] = "DONE" return "DONE", ["DONE"], sections code_blocks = re.findall(r'```(?:python)\s*(.*?)\s*```', input_string, re.DOTALL) if code_blocks: code = code_blocks[-1].strip() sections['original_code'] = transform_agnet_action_to_code_block(code) corrected_code = correct_pyautogui_arguments(code) sections['code'] = corrected_code sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_size[0], screen_height=screen_size[1], coordinate_type=coordinate_type) else: # No code blocks found sections['code'] = "WAIT" return "WAIT", ["WAIT"], sections if 'code' not in sections: logger.error("Missing required action or code section") return None, None, {} if 'action' not in sections: sections['action'] = "" return sections['action'], [sections['code']], sections except Exception as e: logger.exception(f"Error parsing response: {str(e)}\nInput string: {input_string}") return None, None, {} def correct_pyautogui_arguments(code: str) -> str: """Correct the pyautogui arguments""" function_corrections = { 'write': { 'incorrect_args': ['text', 'content'], 'correct_args': [], 'keyword_arg': 'message' }, 'press': { 'incorrect_args': ['key', 'button'], 'correct_args': [], 'keyword_arg': None }, 'hotkey': { 'incorrect_args': ['key1', 'key2', 'keys'], 'correct_args': [], 'keyword_arg': None }, } lines = code.strip().split('\n') corrected_lines = [] for line in lines: line = line.strip() match = re.match(r'(pyautogui\.(\w+))\((.*)\)', line) if match: full_func_call = match.group(1) func_name = match.group(2) args_str = match.group(3) if func_name in function_corrections: func_info = function_corrections[func_name] args = split_args(args_str) corrected_args = [] for arg in args: arg = arg.strip() kwarg_match = re.match(r'(\w+)\s*=\s*(.*)', arg) if kwarg_match: arg_name = kwarg_match.group(1) arg_value = kwarg_match.group(2) if arg_name in func_info['incorrect_args']: if func_info['keyword_arg']: corrected_args.append(f"{func_info['keyword_arg']}={arg_value}") else: corrected_args.append(arg_value) else: corrected_args.append(f'{arg_name}={arg_value}') else: corrected_args.append(arg) corrected_args_str = ', '.join(corrected_args) corrected_line = f'{full_func_call}({corrected_args_str})' corrected_lines.append(corrected_line) else: corrected_lines.append(line) else: corrected_lines.append(line) corrected_code = '\n'.join(corrected_lines) return corrected_code def split_args(args_str: str) -> List[str]: """Split the arguments string into a list of arguments""" args = [] current_arg = '' within_string = False string_char = '' prev_char = '' for char in args_str: if char in ['"', "'"]: if not within_string: within_string = True string_char = char elif within_string and prev_char != '\\' and char == string_char: within_string = False if char == ',' and not within_string: args.append(current_arg) current_arg = '' else: current_arg += char prev_char = char if current_arg: args.append(current_arg) return args def smart_resize( height: int, width: int, factor: int, min_pixels: int, max_pixels: int, max_aspect_ratio_allowed: Optional[float] = None, size_can_be_smaller_than_factor: bool = False, ): """ The function is modified from https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py Qwen2.5-VL based model need this function to resize screenshots. Rescales the image so that the following conditions are met: 1. Both dimensions (height and width) are divisible by 'factor'. 2. The total number of pixels is within the range ['min_pixels', 'max_pixels']. 3. The aspect ratio of the image is maintained as closely as possible. """ if not size_can_be_smaller_than_factor and (height < factor or width < factor): raise ValueError( f"height:{height} or width:{width} must be larger than factor:{factor} " f"(when size_can_be_smaller_than_factor is False)" ) elif max_aspect_ratio_allowed is not None and max(height, width) / min(height, width) > max_aspect_ratio_allowed: raise ValueError( f"absolute aspect ratio must be smaller than {max_aspect_ratio_allowed}, " f"got {max(height, width) / min(height, width)}" f"(when max_aspect_ratio_allowed is not None)" ) h_bar = max(1, round(height / factor)) * factor w_bar = max(1, round(width / factor)) * factor if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = max(1, math.floor(height / beta / factor)) * factor w_bar = max(1, math.floor(width / beta / factor)) * factor elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = math.ceil(height * beta / factor) * factor w_bar = math.ceil(width * beta / factor) * factor return h_bar, w_bar def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type): """Project the coordinates to the absolute scale""" if coordinate_type == "relative": return int(round(x * screen_width)), int(round(y * screen_height)) elif coordinate_type == "absolute": return x, y elif coordinate_type == "qwen25": if 0 <= x <= 1 and 0 <= y <= 1: # If already normalized, treat like "relative" return int(round(x * screen_width)), int(round(y * screen_height)) height, width = smart_resize( height=screen_height, width=screen_width, factor=28, min_pixels=3136, max_pixels=12845056 # We use this max_pixels setting in our training data ) return int(x / width * screen_width), int(y / height * screen_height) else: raise ValueError(f"Unsupported coordinate type: {coordinate_type}") def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="relative"): """Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.""" if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]: raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].") pattern = r'(pyautogui\.\w+\([^\)]*\))' matches = re.findall(pattern, pyautogui_code_relative_coordinates) new_code = pyautogui_code_relative_coordinates for full_call in matches: func_name_pattern = r'(pyautogui\.\w+)\((.*)\)' func_match = re.match(func_name_pattern, full_call, re.DOTALL) if not func_match: continue func_name = func_match.group(1) args_str = func_match.group(2) try: parsed = ast.parse(f"func({args_str})").body[0].value parsed_args = parsed.args parsed_keywords = parsed.keywords except SyntaxError: return pyautogui_code_relative_coordinates function_parameters = { 'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'], 'moveTo': ['x', 'y', 'duration', 'tween', 'pause'], 'moveRel': ['xOffset', 'yOffset', 'duration', 'tween', 'pause'], 'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'], 'dragRel': ['xOffset', 'yOffset', 'duration', 'button', 'mouseDownUp', 'pause'], 'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'], } func_base_name = func_name.split('.')[-1] param_names = function_parameters.get(func_base_name, []) args = {} for idx, arg in enumerate(parsed_args): if idx < len(param_names): param_name = param_names[idx] arg_value = ast.literal_eval(arg) args[param_name] = arg_value try: for kw in parsed_keywords: param_name = kw.arg arg_value = ast.literal_eval(kw.value) args[param_name] = arg_value except Exception as e: logger.error(f"Error parsing keyword arguments: {e}") return pyautogui_code_relative_coordinates updated = False if 'x' in args and 'y' in args: try: x_rel = float(args['x']) y_rel = float(args['y']) x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type) logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.") args['x'] = x_abs args['y'] = y_abs updated = True except ValueError: pass if 'xOffset' in args and 'yOffset' in args: try: x_rel = float(args['xOffset']) y_rel = float(args['yOffset']) x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type) args['xOffset'] = x_abs args['yOffset'] = y_abs updated = True except ValueError: pass if updated: reconstructed_args = [] for idx, param_name in enumerate(param_names): if param_name in args: arg_value = args[param_name] if isinstance(arg_value, str): arg_repr = f"'{arg_value}'" else: arg_repr = str(arg_value) reconstructed_args.append(arg_repr) else: break used_params = set(param_names[:len(reconstructed_args)]) for kw in parsed_keywords: if kw.arg not in used_params: arg_value = args[kw.arg] if isinstance(arg_value, str): arg_repr = f"{kw.arg}='{arg_value}'" else: arg_repr = f"{kw.arg}={arg_value}" reconstructed_args.append(arg_repr) new_args_str = ', '.join(reconstructed_args) new_full_call = f"{func_name}({new_args_str})" new_code = new_code.replace(full_call, new_full_call) return new_code def extract_positions_and_instructions(code, action) -> list[dict]: """ Extracts all `(x, y)` coordinates (both positional and keyword arguments) and their associated preceding comments as instructions from Python code. If there are no comments, use the corresponding action instead. Args: code (str): The Python code as a string. action (str): The low-level action as a string. Returns: list[dict]: A list of dictionaries with extracted positions and instructions. - function (str): The pyautogui function name. - x (int or float): The x-coordinate. - y (int or float): The y-coordinate. - instruction (str): The preceding comment as an instruction. """ lines = code.splitlines() extracted = [] preceding_comment = action # To store the preceding comment for line in lines: preceding_comment = action # Check if the line is a comment and store it if line.strip().startswith("#"): preceding_comment = line.strip().lstrip("#").strip() # Clean the comment # Match pyautogui functions with positional arguments match_positional = re.match(r"(pyautogui\.\w+)\((\d+(\.\d+)?),\s*(\d+(\.\d+)?).*?\)", line) if match_positional: extracted.append({ "function": match_positional.group(1), # pyautogui function name "x": float(match_positional.group(2)) if '.' in match_positional.group(2)\ else int(match_positional.group(2)), # x-coordinate "y": float(match_positional.group(4)) if '.' in match_positional.group(4)\ else int(match_positional.group(3)), # y-coordinate "instruction": preceding_comment, # Use the preceding comment }) preceding_comment = None # Reset after associating it with a line continue # Match pyautogui functions with keyword arguments match_keyword = re.match(r"(pyautogui\.\w+)\(.*?x=(\d+(\.\d+)?),\s*y=(\d+(\.\d+)?).*?\)", line) if match_keyword: extracted.append({ "function": match_keyword.group(1), # pyautogui function name "x": float(match_keyword.group(2)) if '.' in match_keyword.group(2)\ else int(match_keyword.group(2)), # x-coordinate "y": float(match_keyword.group(4)) if '.' in match_keyword.group(4)\ else int(match_keyword.group(3)), # y-coordinate "instruction": preceding_comment, # Use the preceding comment }) preceding_comment = None # Reset after associating it with a line logger.info(f"Grounding extracted:\n{extracted}") return extracted def update_code_with_new_coordinates(code, updated_positions): """ Replaces old `(x, y)` coordinates (both positional and keyword arguments) with updated ones in the code, handling multiple occurrences correctly. Args: code (str): The original Python code as a string. updated_positions (list): A list of dictionaries with updated positions. Returns: str: The updated Python code. """ lines = code.splitlines() updated_code_lines = [] position_index = 0 # Tracks which position update to use for line in lines: if position_index < len(updated_positions): # Get the next update position update = updated_positions[position_index] function_pattern_positional = rf"{update['function']}\(\d+(\.\d+)?, \d+(\.\d+)?" function_pattern_keyword = rf"{update['function']}\(.*?x=\d+(\.\d+)?, y=\d+(\.\d+)?" if re.search(function_pattern_positional, line): # Replace positional arguments line = re.sub( function_pattern_positional, f"{update['function']}({update['x']}, {update['y']}", line, count=1 ) position_index += 1 # Move to the next update elif re.search(function_pattern_keyword, line): # Replace keyword arguments line = re.sub( function_pattern_keyword, f"{update['function']}(x={update['x']}, y={update['y']}", line, count=1 ) position_index += 1 # Move to the next update updated_code_lines.append(line) return "\n".join(updated_code_lines) def transform_agnet_action_to_code_block(action): """Transform the agent action to a code block: not used in agent, for logging only""" if "computer.terminate" in action or "browser.select_option" in action or "browser.clear" in action: return f"```code\n{action}\n```" else: return f"```python\n{action}\n```" class OpenCUAAgent: """ OpenCUA Agent for desktop automation tasks. This class implements a OpenCUA Model based agent that can observe desktop environments through screenshots and execute mouse/keyboard actions via PyAutoGUI to complete automation tasks. Attributes: model (str): Name of the language model being used history_type (str): Type of history recording mechanism actions (list): History of executed actions observations (list): History of environment observations cots (list): Chain of thought reasoning records """ def __init__( self, model: str, # OpenCUA model name history_type: str, # History step type: action_history, thought_history, observation_history max_image_history_length: int = 3, # The max number of images in the history platform: str = "ubuntu", # The platform of the computer max_tokens: int = 1500, # The max number of tokens in the response top_p: float = 0.9, # The top p value in the response temperature: float = 0, # The temperature value in the response action_space: str = "pyautogui", # The action space: pyautogui observation_type: str = "screenshot", # The observation type: screenshot cot_level: str = "l2", # The CoT level: l1, l2, l3 screen_size: Tuple[int, int] = (1920, 1080), # The screen size coordinate_type: str = "relative", # The coordinate type: relative, absolute, qwen25 **kwargs ): assert coordinate_type in ["relative", "absolute", "qwen25"] assert action_space in ["pyautogui"], "Invalid action space" assert observation_type in ["screenshot"], "Invalid observation type" assert history_type in ["action_history", "thought_history", "observation_history"] assert model is not None, "Model cannot be None" self.model = model self.platform = platform self.max_tokens = max_tokens self.top_p = top_p self.temperature = temperature self.action_space = action_space self.observation_type = observation_type self.history_type = history_type self.coordinate_type = coordinate_type self.cot_level = cot_level self.screen_size = screen_size self.max_image_history_length = max_image_history_length if history_type == "action_history": self.HISTORY_TEMPLATE = ACTION_HISTORY_TEMPLATE elif history_type == "thought_history": self.HISTORY_TEMPLATE = THOUGHT_HISTORY_TEMPLATE elif history_type == "observation_history": self.HISTORY_TEMPLATE = OBSERVATION_HISTORY_TEMPLATE else: raise ValueError(f"Invalid history type: {history_type}") if cot_level == "l3": self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L3 elif cot_level == "l2": self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L2 elif cot_level == "l1": self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L1 else: raise ValueError(f"Invalid COT level: {cot_level}") self.actions = [] self.observations = [] self.cots = [] def reset(self, _logger=None): global logger logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent") self.observations = [] self.cots = [] self.actions = [] def _scale_scroll_for_windows(self, code: str, factor: int = 50) -> str: """ pyautogui.scroll has a different scale on Ubuntu and Windows, multiple 'factor' when scrolling on Windows system""" if self.platform.lower() != "windows": return code pattern_pos = re.compile(r'(pyautogui\.scroll\()\s*([-+]?\d+)\s*\)') code = pattern_pos.sub(lambda m: f"{m.group(1)}{int(m.group(2))*factor})", code) return code def predict(self, instruction: str, obs: Dict, **kwargs) -> Tuple[str, List[str], Dict]: """ Predict the next action(s) based on the current observation. """ if "step_idx" in kwargs: logger.info(f"========= {self.model} Step {kwargs['step_idx']} =======") else: logger.info(f"========================== {self.model} ===================================") logger.info(f"Instruction: \n{instruction}") messages = [] messages.append({ "role": "system", "content": self.SYSTEM_PROMPT }) history_step_texts = [] for i in range(len(self.actions)): if i > len(self.actions) - self.max_image_history_length: messages.append({ "role": "user", "content": [ { "type": "image_url", "image_url": {"url": f"data:image/png;base64,{encode_image(self.observations[i]['screenshot'])}"} } ] }) history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format( observation=self.cots[i].get('observation'), thought=self.cots[i].get('thought'), action=self.cots[i].get('action') ) messages.append({ "role": "assistant", "content": history_content }) else: history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format( observation=self.cots[i].get('observation'), thought=self.cots[i].get('thought'), action=self.cots[i].get('action') ) history_step_texts.append(history_content) if i == len(self.actions) - self.max_image_history_length: messages.append({ "role":"assistant", "content": "\n".join(history_step_texts) }) messages.append({ "role": "user", "content": [ { "type": "image_url", "image_url": {"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}"} }, { "type": "text", "text": INSTRUTION_TEMPLATE.format(instruction=instruction) } ] }) response = self.call_llm({ "model": self.model, "messages": messages, "max_tokens": self.max_tokens, "top_p": self.top_p, "temperature": self.temperature }, self.model) logger.info(f"Model Output: \n{response}") if not response: logger.error("No response found in the response.") return "ERROR", ["DONE"], {} low_level_instruction, pyautogui_actions, other_cot = parse_response_to_cot_and_action(response, self.screen_size, self.coordinate_type) if not pyautogui_actions or len(pyautogui_actions) == 0: logger.error("No pyautogui actions found in the response.") return response, ["FAIL"], {} pyautogui_actions = [ self._scale_scroll_for_windows(code) for code in pyautogui_actions ] self.observations.append(obs) logger.info(f"Parsed Low-level Action: \n{low_level_instruction}") logger.info(f"Parsed pyautogui Action: \n{pyautogui_actions}") self.actions.append(low_level_instruction) if 'action' not in other_cot or not other_cot['action'] or 'thought' not in other_cot or not other_cot['thought']: logger.error("Error! no action/thought in cot") logger.error(f"response: {response}") logger.error(f"cot: {other_cot}") self.cots.append(other_cot) # Print message structure if needed # messages_to_print = [] # current_image = 1 # for msg in messages: # msg_copy = copy.deepcopy(msg) # if isinstance(msg_copy['content'], list): # for content in msg_copy['content']: # if content['type'] == 'image_url': # content['image_url']['url'] = f'Image {current_image}' # current_image += 1 # messages_to_print.append(msg_copy) # messages_to_print.append({ # "new_step_cot": other_cot, # "response": response # }) # logger.info(json.dumps(messages_to_print, indent=2)) logger.info(f"New step cot: {other_cot}") return response, pyautogui_actions, {} @backoff.on_exception( backoff.constant, # here you should add more model exceptions as you want, # but you are forbidden to add "Exception", that is, a common type of exception # because we want to catch this kind of Exception in the outside to ensure # each example won't exceed the time limit ( Exception ), interval=30, max_tries=10 ) def call_llm(self, payload, model): """Call the LLM API""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}" } for _ in range(30): response = httpx.post( os.environ['OPENCUA_URL'], headers=headers, json=payload, timeout=500, verify=False ) if response.status_code != 200: logger.error("Failed to call LLM: " + response.text) logger.error("Retrying...") time.sleep(5) else: response = response.json() finish_reason = response["choices"][0].get("finish_reason") if finish_reason is not None and finish_reason == "stop": # for most of the time, length will not exceed max_tokens return response['choices'][0]['message']['content'] else: logger.error("LLM did not finish properly, retrying...") time.sleep(5)