/
OS-Worldc4d818c
import base64
import json
import logging
import os
import re
import tempfile
import time
from http import HTTPStatus
from io import BytesIO
from typing import Dict, List, Tuple
import backoff
import openai
import requests
from PIL import Image
from google.api_core.exceptions import InvalidArgument, ResourceExhausted, InternalServerError, BadRequest
from requests.exceptions import SSLError
from mm_agents.prompts import (
AGUVIS_PLANNER_SYS_PROMPT,
AGUVIS_SYS_PROMPT,
AGUVIS_PLANNING_PROMPT,
AGUVIS_INNER_MONOLOGUE_APPEND_PROMPT,
AGUVIS_GROUNDING_PROMPT,
AGUVIS_GROUNDING_APPEND_PROMPT
)
logger = None
# Function to encode the image
def encode_image(image_content):
return base64.b64encode(image_content).decode('utf-8')
def encoded_img_to_pil_img(data_str):
base64_str = data_str.replace("data:image/png;base64,", "")
image_data = base64.b64decode(base64_str)
image = Image.open(BytesIO(image_data))
return image
def save_to_tmp_img_file(data_str):
base64_str = data_str.replace("data:image/png;base64,", "")
image_data = base64.b64decode(base64_str)
image = Image.open(BytesIO(image_data))
tmp_img_path = os.path.join(tempfile.mkdtemp(), "tmp_img.png")
image.save(tmp_img_path)
return tmp_img_path
# FIXME: hardcoded screen size and planner system message
SCREEN_LOGIC_SIZE = (1280, 720)
def parse_code_from_planner_response(input_string: str) -> List[str]:
"""Parse the planner's response containing executable pyautogui code"""
input_string = "\n".join([line.strip() for line in input_string.split(';') if line.strip()])
if input_string.strip() in ['WAIT', 'DONE', 'FAIL']:
return [input_string.strip()]
# This regular expression will match both ```code``` and ```python code```
# and capture the `code` part. It uses a non-greedy match for the content inside.
pattern = r"```(?:\w+\s+)?(.*?)```"
# Find all non-overlapping matches in the string
matches = re.findall(pattern, input_string, re.DOTALL)
# The regex above captures the content inside the triple backticks.
# The `re.DOTALL` flag allows the dot `.` to match newline characters as well,
# so the code inside backticks can span multiple lines.
# matches now contains all the captured code snippets
codes = []
for match in matches:
match = match.strip()
commands = ['WAIT', 'DONE', 'FAIL']
if match in commands:
codes.append(match.strip())
elif match.split('\n')[-1] in commands:
if len(match.split('\n')) > 1:
codes.append("\n".join(match.split('\n')[:-1]))
codes.append(match.split('\n')[-1])
else:
codes.append(match)
return codes
def parse_aguvis_response(input_string, screen_logic_size=SCREEN_LOGIC_SIZE) -> Tuple[str, List[str]]:
if input_string.lower().startswith("wait"):
return "WAIT", "WAIT"
elif input_string.lower().startswith("done"):
return "DONE", "DONE"
elif input_string.lower().startswith("fail"):
return "FAIL", "FAIL"
try:
lines = input_string.strip().split("\n")
lines = [line for line in lines if line.strip() != ""]
low_level_instruction = lines[0]
pyautogui_index = -1
for i, line in enumerate(lines):
if line.strip() == "assistantos" or line.strip().startswith("pyautogui"):
pyautogui_index = i
break
if pyautogui_index == -1:
print(f"Error: Could not parse response {input_string}")
return None, None
pyautogui_code_relative_coordinates = "\n".join(lines[pyautogui_index:])
pyautogui_code_relative_coordinates = pyautogui_code_relative_coordinates.replace("assistantos", "").strip()
corrected_code = correct_pyautogui_arguments(pyautogui_code_relative_coordinates)
parsed_action = _pyautogui_code_to_absolute_coordinates(corrected_code, screen_logic_size)
return low_level_instruction, parsed_action
except Exception as e:
print(f"Error: Could not parse response {input_string}")
return None, None
def correct_pyautogui_arguments(code: str) -> str:
function_corrections = {
'write': {
'incorrect_args': ['text'],
'correct_args': [],
'keyword_arg': 'message'
},
'press': {
'incorrect_args': ['key', 'button'],
'correct_args': [],
'keyword_arg': None
},
'hotkey': {
'incorrect_args': ['key1', 'key2', 'keys'],
'correct_args': [],
'keyword_arg': None
},
}
lines = code.strip().split('\n')
corrected_lines = []
for line in lines:
line = line.strip()
match = re.match(r'(pyautogui\.(\w+))\((.*)\)', line)
if match:
full_func_call = match.group(1)
func_name = match.group(2)
args_str = match.group(3)
if func_name in function_corrections:
func_info = function_corrections[func_name]
args = split_args(args_str)
corrected_args = []
for arg in args:
arg = arg.strip()
kwarg_match = re.match(r'(\w+)\s*=\s*(.*)', arg)
if kwarg_match:
arg_name = kwarg_match.group(1)
arg_value = kwarg_match.group(2)
if arg_name in func_info['incorrect_args']:
if func_info['keyword_arg']:
corrected_args.append(f"{func_info['keyword_arg']}={arg_value}")
else:
corrected_args.append(arg_value)
else:
corrected_args.append(f'{arg_name}={arg_value}')
else:
corrected_args.append(arg)
corrected_args_str = ', '.join(corrected_args)
corrected_line = f'{full_func_call}({corrected_args_str})'
corrected_lines.append(corrected_line)
else:
corrected_lines.append(line)
else:
corrected_lines.append(line)
corrected_code = '\n'.join(corrected_lines)
return corrected_code
def split_args(args_str: str) -> List[str]:
args = []
current_arg = ''
within_string = False
string_char = ''
prev_char = ''
for char in args_str:
if char in ['"', "'"]:
if not within_string:
within_string = True
string_char = char
elif within_string and prev_char != '\\' and char == string_char:
within_string = False
if char == ',' and not within_string:
args.append(current_arg)
current_arg = ''
else:
current_arg += char
prev_char = char
if current_arg:
args.append(current_arg)
return args
def extract_coordinates(text, logical_screen_size=SCREEN_LOGIC_SIZE) -> Tuple[int, int] | None:
# Pattern to match (x=0.1, y=0.2) or (0.1, 0.2) format
text = text.strip()
logger.info(f"Extracting coordinates from: {text}")
pattern = r'\((?:x=)?([-+]?\d*\.\d+|\d+)(?:,\s*(?:y=)?([-+]?\d*\.\d+|\d+))?\)'
match = re.search(pattern, text)
if match:
x = int(float(match.group(1)) * logical_screen_size[0])
y = int(float(match.group(2)) * logical_screen_size[1]) if match.group(2) else None
if y is not None:
return (x, y)
logger.info(f"Error: No coordinates found in: {text}")
return None
def _pyautogui_code_to_absolute_coordinates(pyautogui_code_relative_coordinates, logical_screen_size=SCREEN_LOGIC_SIZE):
"""
Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.
"""
import re
import ast
width, height = logical_screen_size
pattern = r'(pyautogui\.\w+\([^\)]*\))'
matches = re.findall(pattern, pyautogui_code_relative_coordinates)
new_code = pyautogui_code_relative_coordinates
for full_call in matches:
func_name_pattern = r'(pyautogui\.\w+)\((.*)\)'
func_match = re.match(func_name_pattern, full_call, re.DOTALL)
if not func_match:
continue
func_name = func_match.group(1)
args_str = func_match.group(2)
try:
parsed = ast.parse(f"func({args_str})").body[0].value
parsed_args = parsed.args
parsed_keywords = parsed.keywords
except SyntaxError:
continue
function_parameters = {
'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'],
'moveTo': ['x', 'y', 'duration', 'tween', 'pause'],
'moveRel': ['xOffset', 'yOffset', 'duration', 'tween', 'pause'],
'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'],
'dragRel': ['xOffset', 'yOffset', 'duration', 'button', 'mouseDownUp', 'pause'],
'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'],
}
func_base_name = func_name.split('.')[-1]
param_names = function_parameters.get(func_base_name, [])
args = {}
for idx, arg in enumerate(parsed_args):
if idx < len(param_names):
param_name = param_names[idx]
arg_value = ast.literal_eval(arg)
args[param_name] = arg_value
for kw in parsed_keywords:
param_name = kw.arg
arg_value = ast.literal_eval(kw.value)
args[param_name] = arg_value
updated = False
if 'x' in args:
try:
x_rel = float(args['x'])
x_abs = int(round(x_rel * width))
args['x'] = x_abs
updated = True
except ValueError:
pass
if 'y' in args:
try:
y_rel = float(args['y'])
y_abs = int(round(y_rel * height))
args['y'] = y_abs
updated = True
except ValueError:
pass
if 'xOffset' in args:
try:
x_rel = float(args['xOffset'])
x_abs = int(round(x_rel * width))
args['xOffset'] = x_abs
updated = True
except ValueError:
pass
if 'yOffset' in args:
try:
y_rel = float(args['yOffset'])
y_abs = int(round(y_rel * height))
args['yOffset'] = y_abs
updated = True
except ValueError:
pass
if updated:
reconstructed_args = []
for idx, param_name in enumerate(param_names):
if param_name in args:
arg_value = args[param_name]
if isinstance(arg_value, str):
arg_repr = f"'{arg_value}'"
else:
arg_repr = str(arg_value)
reconstructed_args.append(arg_repr)
else:
break
used_params = set(param_names[:len(reconstructed_args)])
for kw in parsed_keywords:
if kw.arg not in used_params:
arg_value = args[kw.arg]
if isinstance(arg_value, str):
arg_repr = f"{kw.arg}='{arg_value}'"
else:
arg_repr = f"{kw.arg}={arg_value}"
reconstructed_args.append(arg_repr)
new_args_str = ', '.join(reconstructed_args)
new_full_call = f"{func_name}({new_args_str})"
new_code = new_code.replace(full_call, new_full_call)
return new_code
class AguvisAgent:
def __init__(
self,
platform="ubuntu",
planner_model="gpt-4o",
executor_model="qwen-aguvis-7b",
max_tokens=1500,
top_p=0.9,
temperature=0.5,
action_space="pyautogui",
observation_type="screenshot",
):
self.platform = platform
self.planner_model = planner_model
self.executor_model = executor_model
assert self.executor_model is not None, "Executor model cannot be None"
self.max_tokens = max_tokens
self.top_p = top_p
self.temperature = temperature
self.action_space = action_space
self.observation_type = observation_type
assert action_space in ["pyautogui"], "Invalid action space"
assert observation_type in ["screenshot"], "Invalid observation type"
self.thoughts = []
self.actions = []
self.observations = []
def predict(self, instruction: str, obs: Dict) -> List:
"""
Predict the next action(s) based on the current observation.
"""
previous_actions = "\n".join([f"Step {i+1}: {action}" for i, action in enumerate(self.actions)]) if self.actions else "None"
if self.planner_model is None:
aguvis_messages = []
aguvis_messages.append({
"role": "system",
"content": [{"type": "text", "text": AGUVIS_SYS_PROMPT}]
})
aguvis_messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": AGUVIS_PLANNING_PROMPT.format(
instruction=instruction,
previous_actions=previous_actions,
)
},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}"}
}
],
})
aguvis_messages.append({
"role": "assistant",
"content": [
{"type": "text", "text": AGUVIS_INNER_MONOLOGUE_APPEND_PROMPT}
]
})
aguvis_response = self.call_llm({
"model": self.executor_model,
"messages": aguvis_messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.executor_model)
logger.info(f"Aguvis Output: {aguvis_response}")
low_level_instruction, pyautogui_actions = parse_aguvis_response(aguvis_response)
self.actions.append(low_level_instruction)
return aguvis_response, [pyautogui_actions]
else:
# FIXME [junli]:
# Using an external planner (GPT-4o) requires relying on more
# detailed prompt to provide Aguvis with low level instructions.
# So we temporarily separate the planner prompt and aguvis prompt.
planner_messages = []
planner_system_message = AGUVIS_PLANNER_SYS_PROMPT
planner_messages.append({
"role": "system",
"content": [{"type": "text", "text": planner_system_message}]
})
planner_messages.append(
{
"role": "user",
"content": [
{
"type": "text",
"text": f"You are asked to complete the following task: {instruction}"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
"detail": "high"
}
}
]
}
)
planner_response = self.call_llm({
"model": self.planner_model,
"messages": planner_messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.planner_model)
logger.info(f"Planner output: {planner_response}")
code = parse_code_from_planner_response(planner_response)
pyautogui_actions = []
for line in code:
code = self.convert_action_to_grounding_model_instruction(
line,
obs,
instruction,
)
pyautogui_actions.append(code)
return "", pyautogui_actions
def convert_action_to_grounding_model_instruction(
self, line: str, obs: Dict, instruction: str
) -> str:
pattern = r'(#.*?)\n(pyautogui\.(moveTo|click|rightClick)\((?:x=)?(\d+)(?:,\s*|\s*,\s*y=)(\d+)(?:,\s*duration=[\d.]+)?\))'
matches = re.findall(pattern, line, re.DOTALL)
if not matches:
return line
new_instruction = line
for match in matches:
comment = match[0].split("#")[1].strip()
original_action = match[1]
func_name = match[2].strip()
if "click()" in original_action.lower():
continue # Skip click() without coordinates
aguvis_messages = []
aguvis_messages.append({
"role": "system",
"content": [{"type": "text", "text": AGUVIS_SYS_PROMPT}]
})
aguvis_messages.append(
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
"detail": "high",
},
},
{
"type": "text",
"text": '\n' + comment,
},
],
}
)
aguvis_messages.append(
{
"role": "assistant",
"content": [
{"type": "text", "text": AGUVIS_GROUNDING_APPEND_PROMPT.format(function_name=func_name)}
],
}
)
grounding_response = self.call_llm({
"model": self.executor_model,
"messages": aguvis_messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.executor_model)
coordinates = extract_coordinates(grounding_response, SCREEN_LOGIC_SIZE)
# FIXME [junli]: Use ast to reconstruct the action with coordinates
action_parts = original_action.split('(')
new_action = f"{action_parts[0]}({coordinates[0]}, {coordinates[1]}"
if len(action_parts) > 1 and 'duration' in action_parts[1]:
duration_part = action_parts[1].split(',')[-1]
new_action += f", {duration_part}"
elif len(action_parts) > 1 and 'button' in action_parts[1]:
button_part = action_parts[1].split(',')[-1]
new_action += f", {button_part}"
else:
new_action += ")"
logger.info(new_action)
new_instruction = new_instruction.replace(original_action, new_action)
return new_instruction
@backoff.on_exception(
backoff.constant,
# here you should add more model exceptions as you want,
# but you are forbidden to add "Exception", that is, a common type of exception
# because we want to catch this kind of Exception in the outside to ensure
# each example won't exceed the time limit
(
# General exceptions
SSLError,
# OpenAI exceptions
openai.RateLimitError,
openai.BadRequestError,
openai.InternalServerError,
# Google exceptions
InvalidArgument,
ResourceExhausted,
InternalServerError,
BadRequest,
# Groq exceptions
# todo: check
),
interval=30,
max_tries=10
)
def call_llm(self, payload, model):
if model.startswith("gpt"):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"
# "Authorization": f"Bearer {os.environ['MIT_SPIDER_TOKEN']}"
}
logger.info("Generating content with GPT model: %s", model)
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload
)
if response.status_code != 200:
logger.error("Failed to call LLM: " + response.text)
time.sleep(5)
return ""
else:
return response.json()['choices'][0]['message']['content']
elif "aguvis" in model:
headers = {
"Content-Type": "application/json",
}
logger.info("Generating content with Aguvis model: %s", model)
if "7b" in model:
response = requests.post(
"http://101.132.136.195:7908/v1/chat/completions",
headers=headers,
json=payload
)
elif "72b" in model:
response = requests.post(
"http://123.57.10.166:7908/v1/chat/completions",
headers=headers,
json=payload
)
else:
raise Exception("Unsupported Aguvis model version")
if response.status_code != 200:
logger.error("Failed to call LLM: " + response.text)
time.sleep(5)
return ""
else:
return response.json()['choices'][0]['message']['content']
def reset(self, _logger=None):
global logger
logger = _logger if _logger is not None else logging.getLogger("desktopenv.aguvis_agent")
self.thoughts = []
self.action_descriptions = []
self.actions = []
self.observations = []