/
OS-World523d553
'''
The code is mainly based on:
- Jedi https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/jedi_7b_agent.py
- AgentS2 https://github.com/simular-ai/Agent-S
'''
import base64
import json
import logging
import os
import re
import time
from io import BytesIO
import backoff
import openai
import requests
from PIL import Image
from google.api_core.exceptions import (
InvalidArgument,
ResourceExhausted,
InternalServerError,
BadRequest,
)
from requests.exceptions import SSLError
import os
from mm_agents.prompts import GTA1_PLANNER_SYSTEM_PROMPT, GTA1_GROUNDING_SYSTEM_PROMPT, GTA1_JUDGE_SYSTEM_PROMPT
from mm_agents.utils.qwen_vl_utils import smart_resize
from pytesseract import Output
import pytesseract
import inspect
import textwrap
import ast
import re
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from openai import OpenAI, APIConnectionError, APIError, RateLimitError
import cv2
logger = None
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY",None) #"Your OpenAI API Key"
GTA1_API_KEY = os.environ.get("GTA1_API_KEY",None) #"Your GTA1 API Key"
GTA1_MODEL_NMAE = os.environ.get("GTA1_API_KEY",None) #Your served model name
GTA1_SERVICE_URL = os.environ.get("GTA1_SERVICE_URL",None) #"Your GTA1 Service URL"
proxies = None # Your proxies
MAX_RETRY_TIMES = 20
def encode_image(image_content):
return base64.b64encode(image_content).decode("utf-8")
class LMMEngineOpenAI:
'''
functions borrow from https://github.com/simular-ai/Agent-S/blob/main/gui_agents/s2/core/engine.py#L247
'''
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
api_key = api_key or os.getenv("OPENAI_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named OPENAI_API_KEY"
)
self.base_url = base_url
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
if not self.base_url:
self.llm_client = OpenAI(api_key=self.api_key)
else:
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
return (
self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 4096,
#temperature=temperature,
**kwargs,
)
.choices[0]
.message.content
)
class LMMAgent:
'''
functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/core/mllm.py#L16
'''
def __init__(self, engine_params=None, system_prompt=None, engine=None):
if engine is None:
if engine_params is not None:
engine_type = engine_params.get("engine_type")
if engine_type == "openai":
self.engine = LMMEngineOpenAI(**engine_params)
else:
raise ValueError("engine_type is not supported")
else:
raise ValueError("engine_params must be provided")
else:
self.engine = engine
self.messages = []
if system_prompt:
self.add_system_prompt(system_prompt)
else:
self.add_system_prompt("You are a helpful assistant.")
def encode_image(self, image_content):
# if image_content is a path to an image file, check type of the image_content to verify
if isinstance(image_content, str):
with open(image_content, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
else:
return base64.b64encode(image_content).decode("utf-8")
def reset(
self,
):
self.messages = [
{
"role": "system",
"content": [{"type": "text", "text": self.system_prompt}],
}
]
def add_system_prompt(self, system_prompt):
self.system_prompt = system_prompt
if len(self.messages) > 0:
self.messages[0] = {
"role": "system",
"content": [{"type": "text", "text": self.system_prompt}],
}
else:
self.messages.append(
{
"role": "system",
"content": [{"type": "text", "text": self.system_prompt}],
}
)
def remove_message_at(self, index):
"""Remove a message at a given index"""
if index < len(self.messages):
self.messages.pop(index)
def replace_message_at(
self, index, text_content, image_content=None, image_detail="high"
):
"""Replace a message at a given index"""
if index < len(self.messages):
self.messages[index] = {
"role": self.messages[index]["role"],
"content": [{"type": "text", "text": text_content}],
}
if image_content:
base64_image = self.encode_image(image_content)
self.messages[index]["content"].append(
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}",
"detail": image_detail,
},
}
)
def add_message(
self,
text_content,
image_content=None,
role=None,
image_detail="high",
put_text_last=False,
):
"""Add a new message to the list of messages"""
# API-style inference from OpenAI and AzureOpenAI
if isinstance(
self.engine,
(
LMMEngineOpenAI,
),
):
# infer role from previous message
if role != "user":
if self.messages[-1]["role"] == "system":
role = "user"
elif self.messages[-1]["role"] == "user":
role = "assistant"
elif self.messages[-1]["role"] == "assistant":
role = "user"
message = {
"role": role,
"content": [{"type": "text", "text": text_content}],
}
if isinstance(image_content, np.ndarray) or image_content:
# Check if image_content is a list or a single image
if isinstance(image_content, list):
# If image_content is a list of images, loop through each image
for image in image_content:
base64_image = self.encode_image(image)
message["content"].append(
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}",
"detail": image_detail,
},
}
)
else:
# If image_content is a single image, handle it directly
base64_image = self.encode_image(image_content)
message["content"].append(
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}",
"detail": image_detail,
},
}
)
# Rotate text to be the last message if desired
if put_text_last:
text_content = message["content"].pop(0)
message["content"].append(text_content)
self.messages.append(message)
else:
raise ValueError("engine_type is not supported")
def get_response(
self,
user_message=None,
messages=None,
temperature=0.0,
max_new_tokens=None,
**kwargs,
):
"""Generate the next response based on previous messages"""
if messages is None:
messages = self.messages
if user_message:
messages.append(
{"role": "user", "content": [{"type": "text", "text": user_message}]}
)
return self.engine.generate(
messages,
temperature=temperature,
max_new_tokens=max_new_tokens,
**kwargs,
)
def agent_action(func):
func.is_agent_action = True
return func
UBUNTU_APP_SETUP = f"""import subprocess;
import difflib;
import pyautogui;
pyautogui.press('escape');
time.sleep(0.5);
output = subprocess.check_output(['wmctrl', '-lx']);
output = output.decode('utf-8').splitlines();
window_titles = [line.split(None, 4)[2] for line in output];
closest_matches = difflib.get_close_matches('APP_NAME', window_titles, n=1, cutoff=0.1);
if closest_matches:
closest_match = closest_matches[0];
for line in output:
if closest_match in line:
window_id = line.split()[0]
break;
subprocess.run(['wmctrl', '-ia', window_id])
subprocess.run(['wmctrl', '-ir', window_id, '-b', 'add,maximized_vert,maximized_horz'])
"""
SET_CELL_VALUES_CMD = """import uno
import subprocess
def identify_document_type(component):
if component.supportsService("com.sun.star.sheet.SpreadsheetDocument"):
return "Calc"
if component.supportsService("com.sun.star.text.TextDocument"):
return "Writer"
if component.supportsService("com.sun.star.sheet.PresentationDocument"):
return "Impress"
return None
def cell_ref_to_indices(cell_ref):
column_letters = ''.join(filter(str.isalpha, cell_ref))
row_number = ''.join(filter(str.isdigit, cell_ref))
col = sum((ord(char.upper()) - ord('A') + 1) * (26**idx) for idx, char in enumerate(reversed(column_letters))) - 1
row = int(row_number) - 1
return col, row
def set_cell_values(new_cell_values: dict[str, str], app_name: str = "Untitled 1", sheet_name: str = "Sheet1"):
new_cell_values_idx = {{}}
for k, v in new_cell_values.items():
try:
col, row = cell_ref_to_indices(k)
except:
col = row = None
if col is not None and row is not None:
new_cell_values_idx[(col, row)] = v
# Clean up previous TCP connections.
subprocess.run(
'echo \"password\" | sudo -S ss --kill --tcp state TIME-WAIT sport = :2002',
shell=True,
check=True,
text=True,
capture_output=True
)
# Dynamically allow soffice to listen on port 2002.
subprocess.run(
[
"soffice",
"--accept=socket,host=localhost,port=2002;urp;StarOffice.Service"
]
)
local_context = uno.getComponentContext()
resolver = local_context.ServiceManager.createInstanceWithContext(
"com.sun.star.bridge.UnoUrlResolver", local_context
)
context = resolver.resolve(
f"uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext"
)
desktop = context.ServiceManager.createInstanceWithContext(
"com.sun.star.frame.Desktop", context
)
# Collect all LibreOffice-related opened windows.
documents = []
for i, component in enumerate(desktop.Components):
title = component.Title
doc_type = identify_document_type(component)
documents.append((i, component, title, doc_type))
# Find the LibreOffice Calc app and the sheet of interest.
spreadsheet = [doc for doc in documents if doc[3] == "Calc"]
selected_spreadsheet = [doc for doc in spreadsheet if doc[2] == app_name]
if spreadsheet:
try:
if selected_spreadsheet:
spreadsheet = selected_spreadsheet[0][1]
else:
spreadsheet = spreadsheet[0][1]
sheet = spreadsheet.Sheets.getByName(sheet_name)
except:
raise ValueError(f"Could not find sheet {{sheet_name}} in {{app_name}}.")
for (col, row), value in new_cell_values_idx.items():
cell = sheet.getCellByPosition(col, row)
# Set the cell value.
if isinstance(value, (int, float)):
cell.Value = value
elif isinstance(value, str):
if value.startswith("="):
cell.Formula = value
else:
cell.String = value
elif isinstance(value, bool):
cell.Value = 1 if value else 0
elif value is None:
cell.clearContents(0)
else:
raise ValueError(f"Unsupported cell value type: {{type(value)}}")
else:
raise ValueError(f"Could not find LibreOffice Calc app corresponding to {{app_name}}.")
set_cell_values(new_cell_values={cell_values}, app_name="{app_name}", sheet_name="{sheet_name}")
"""
class OSWorldACI:
'''
classes borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/agents/grounding.py#L159
'''
PHRASE_TO_WORD_COORDS_PROMPT = textwrap.dedent(
"""
You are an expert in graphical user interfaces. Your task is to process a phrase of text, and identify the most relevant word on the computer screen.
You are provided with a phrase, a table with all the text on the screen, and a screenshot of the computer screen. You will identify the single word id that is best associated with the provided phrase.
This single word must be displayed on the computer screenshot, and its location on the screen should align with the provided phrase.
Each row in the text table provides 2 pieces of data in the following order. 1st is the unique word id. 2nd is the corresponding word.
To be successful, it is very important to follow all these rules:
1. First, think step by step and generate your reasoning about which word id to click on.
2. Then, output the unique word id. Remember, the word id is the 1st number in each row of the text table.
3. If there are multiple occurrences of the same word, use the surrounding context in the phrase to choose the correct one. Pay very close attention to punctuation and capitalization.
"""
)
def __init__(
self,
platform: 'linux',
width: int = 1920,
height: int = 1080,
):
self.platform = (
platform # Dictates how the switch_applications agent action works.
)
engine_params_for_generation = engine_params = {
"engine_type": 'openai',
"model": 'o3',
"base_url": '',
"api_key": os.environ.get("OPENAI_API_KEY", ""),
}
# Configure scaling
self.width = width
self.height = height
# Maintain state for save_to_knowledge
self.notes = []
# Coordinates used during ACI execution
self.coords1 = None
self.coords2 = None
# Configure text grounding agent
self.text_span_agent = LMMAgent(
engine_params=engine_params_for_generation,
system_prompt=self.PHRASE_TO_WORD_COORDS_PROMPT,
)
self.dummy_agent = DummyAgent(platform=platform)
# Given the state and worker's referring expression, use the grounding model to generate (x,y)
def generate_coords(self, ref_expr: str, obs: Dict, request_vllm) -> List[int]:
return request_vllm(image=obs["screenshot"], prompt=ref_expr)
# Calls pytesseract to generate word level bounding boxes for text grounding
def get_ocr_elements(self, b64_image_data: str) -> Tuple[str, List]:
image = Image.open(BytesIO(b64_image_data))
image_data = pytesseract.image_to_data(image, output_type=Output.DICT)
# Clean text by removing leading and trailing spaces and non-alphabetical characters, but keeping punctuation
for i, word in enumerate(image_data["text"]):
image_data["text"][i] = re.sub(
r"^[^a-zA-Z\s.,!?;:\-\+]+|[^a-zA-Z\s.,!?;:\-\+]+$", "", word
)
ocr_elements = []
ocr_table = "Text Table:\nWord id\tText\n"
# Obtain the <id, text, group number, word number> for each valid element
grouping_map = defaultdict(list)
ocr_id = 0
for i in range(len(image_data["text"])):
block_num = image_data["block_num"][i]
if image_data["text"][i]:
grouping_map[block_num].append(image_data["text"][i])
ocr_table += f"{ocr_id}\t{image_data['text'][i]}\n"
ocr_elements.append(
{
"id": ocr_id,
"text": image_data["text"][i],
"group_num": block_num,
"word_num": len(grouping_map[block_num]),
"left": image_data["left"][i],
"top": image_data["top"][i],
"width": image_data["width"][i],
"height": image_data["height"][i],
}
)
ocr_id += 1
return ocr_table, ocr_elements
# Given the state and worker's text phrase, generate the coords of the first/last word in the phrase
def generate_text_coords(
self, phrase: str, obs: Dict, alignment: str = ""
) -> List[int]:
ocr_table, ocr_elements = self.get_ocr_elements(obs["screenshot"])
alignment_prompt = ""
if alignment == "start":
alignment_prompt = "**Important**: Output the word id of the FIRST word in the provided phrase.\n"
elif alignment == "end":
alignment_prompt = "**Important**: Output the word id of the LAST word in the provided phrase.\n"
# Load LLM prompt
self.text_span_agent.reset()
self.text_span_agent.add_message(
alignment_prompt + "Phrase: " + phrase + "\n" + ocr_table, role="user"
)
self.text_span_agent.add_message(
"Screenshot:\n", image_content=obs["screenshot"], role="user"
)
# Obtain the target element
response = call_llm_safe(self.text_span_agent)
#print("TEXT SPAN AGENT RESPONSE:", response)
numericals = re.findall(r"\d+", response)
if len(numericals) > 0:
text_id = int(numericals[-1])
else:
text_id = 0
elem = ocr_elements[text_id]
# Compute the element coordinates
if alignment == "start":
coords = [elem["left"], elem["top"] + (elem["height"] // 2)]
elif alignment == "end":
coords = [elem["left"] + elem["width"], elem["top"] + (elem["height"] // 2)]
else:
coords = [
elem["left"] + (elem["width"] // 2),
elem["top"] + (elem["height"] // 2),
]
return coords
# Takes a description based action and assigns the coordinates for any coordinate based action
# Raises an error if function can't be parsed
def assign_coordinates(self, plan: str, obs: Dict, request_vllm):
# Reset coords from previous action generation
self.coords1, self.coords2 = None, None
try:
# Extract the function name and args
action = parse_single_code_from_string(plan.split("Grounded Action")[-1])
function_name = re.match(r"(\w+\.\w+)\(", action).group(1)
args = self.parse_function_args(action)
except Exception as e:
raise RuntimeError(f"Error in parsing grounded action: {e}") from e
# arg0 is a description
if (
function_name in ["agent.click", "agent.type", "agent.scroll"]
and len(args) >= 1
and args[0] != None
):
self.coords1 = self.generate_coords(args[0], obs, request_vllm)
# arg0 and arg1 are descriptions
elif function_name == "agent.drag_and_drop" and len(args) >= 2:
self.coords1 = self.generate_coords(args[0], obs, request_vllm)
self.coords2 = self.generate_coords(args[1], obs, request_vllm)
# arg0 and arg1 are text phrases
elif function_name == "agent.highlight_text_span" and len(args) >= 2:
self.coords1 = self.generate_text_coords(args[0], obs, alignment="start")
self.coords2 = self.generate_text_coords(args[1], obs, alignment="end")
# Resize from grounding model dim into OSWorld dim (1920 * 1080)
def resize_coordinates(self, coordinates: List[int]) -> List[int]:
return [
round(coordinates[0] * self.width),
round(coordinates[1] * self.height),
]
# Given a generated ACI function, returns a list of argument values, where descriptions are at the front of the list
def parse_function_args(self, function: str) -> List[str]:
tree = ast.parse(function)
call_node = tree.body[0].value
def safe_eval(node):
if isinstance(
node, ast.Constant
): # Handles literals like numbers, strings, etc.
return node.value
else:
return ast.unparse(node) # Return as a string if not a literal
positional_args = [safe_eval(arg) for arg in call_node.args]
keyword_args = {kw.arg: safe_eval(kw.value) for kw in call_node.keywords}
res = []
for key, val in keyword_args.items():
if "description" in key:
res.append(val)
for arg in positional_args:
res.append(arg)
return res
def click(
self,
instruction: str,
num_clicks: int = 1,
button_type: str = "left",
hold_keys: List = [],
):
"""Click on the element
Args:
instruction:str, decribe the element you want to interact with in detail including the visual description and function description. And make it clear and concise. For example you can describe what the element looks like, and what will be the expected result when you interact with it.
num_clicks:int, number of times to click the element
button_type:str, which mouse button to press can be "left", "middle", or "right"
hold_keys:List, list of keys to hold while clicking
"""
x, y = self.resize_coordinates(self.coords1)
command = "import pyautogui; "
# TODO: specified duration?
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"""import pyautogui; pyautogui.click({x}, {y}, clicks={num_clicks}, button={repr(button_type)}); """
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
# Return pyautoguicode to click on the element
return command
def switch_applications(self, app_code):
"""Switch to a different application that is already open
Args:
app_code:str the code name of the application to switch to from the provided list of open applications
"""
if self.platform == "darwin":
return f"import pyautogui; import time; pyautogui.hotkey('command', 'space', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)"
elif self.platform == "linux":
return UBUNTU_APP_SETUP.replace("APP_NAME", app_code)
elif self.platform == "windows":
return f"import pyautogui; import time; pyautogui.hotkey('win', 'd', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)"
def open(self, app_or_filename: str):
"""Open any application or file with name app_or_filename. Use this action to open applications or files on the desktop, do not open manually.
Args:
app_or_filename:str, the name of the application or filename to open
"""
return f"import pyautogui; pyautogui.hotkey('win'); time.sleep(0.5); pyautogui.write({repr(app_or_filename)}); time.sleep(1.0); pyautogui.hotkey('enter'); time.sleep(0.5)"
def type(
self,
element_description: Optional[str] = None,
text: str = "",
overwrite: bool = False,
enter: bool = False,
):
"""Type text into a specific element
Args:
element_description:str, a detailed description of which element to enter text in. This description should be at least a full sentence.
text:str, the text to type
overwrite:bool, Assign it to True if the text should overwrite the existing text, otherwise assign it to False. Using this argument clears all text in an element.
enter:bool, Assign it to True if the enter key should be pressed after typing the text, otherwise assign it to False.
"""
if self.coords1 is not None:
# If a node is found, retrieve its coordinates and size
# Start typing at the center of the element
x, y = self.resize_coordinates(self.coords1)
command = "import pyautogui; "
command += f"pyautogui.click({x}, {y}); "
if overwrite:
command += (
f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); "
)
command += f"pyautogui.write({repr(text)}); "
if enter:
command += "pyautogui.press('enter'); "
else:
# If no element is found, start typing at the current cursor location
command = "import pyautogui; "
if overwrite:
command += (
f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); "
)
command += f"pyautogui.write({repr(text)}); "
if enter:
command += "pyautogui.press('enter'); "
return command
def drag_and_drop(
self, starting_description: str, ending_description: str, hold_keys: List = []
):
"""Drag from the starting description to the ending description
Args:
starting_description:str, a very detailed description of where to start the drag action. This description should be at least a full sentence. And make it clear and concise.
ending_description:str, a very detailed description of where to end the drag action. This description should be at least a full sentence. And make it clear and concise.
hold_keys:List list of keys to hold while dragging
"""
x1, y1 = self.resize_coordinates(self.coords1)
x2, y2 = self.resize_coordinates(self.coords2)
command = "import pyautogui; "
command += f"pyautogui.moveTo({x1}, {y1}); "
# TODO: specified duration?
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); "
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
# Return pyautoguicode to drag and drop the elements
return command
def highlight_text_span(self, starting_phrase: str, ending_phrase: str):
"""Highlight a text span between a provided starting phrase and ending phrase. Use this to highlight words, lines, and paragraphs.
Args:
starting_phrase:str, the phrase that denotes the start of the text span you want to highlight. If you only want to highlight one word, just pass in that single word.
ending_phrase:str, the phrase that denotes the end of the text span you want to highlight. If you only want to highlight one word, just pass in that single word.
"""
x1, y1 = self.coords1
x2, y2 = self.coords2
command = "import pyautogui; "
command += f"pyautogui.moveTo({x1}, {y1}); "
command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); "
# Return pyautoguicode to drag and drop the elements
return command
def set_cell_values(
self, cell_values: Dict[str, Any], app_name: str, sheet_name: str
):
"""Use this to set individual cell values in a spreadsheet. For example, setting A2 to "hello" would be done by passing {"A2": "hello"} as cell_values. The sheet must be opened before this command can be used.
Args:
cell_values: Dict[str, Any], A dictionary of cell values to set in the spreadsheet. The keys are the cell coordinates in the format "A1", "B2", etc.
Supported value types include: float, int, string, bool, formulas.
app_name: str, The name of the spreadsheet application. For example, "Some_sheet.xlsx".
sheet_name: str, The name of the sheet in the spreadsheet. For example, "Sheet1".
"""
return SET_CELL_VALUES_CMD.format(
cell_values=cell_values, app_name=app_name, sheet_name=sheet_name
)
def scroll(self, instruction: str, clicks: int, shift: bool = False):
"""Scroll the element in the specified direction
Args:
instruction:str, a very detailed description of which element to enter scroll in. This description should be at least a full sentence. And make it clear and concise.
clicks:int, the number of clicks to scroll can be positive (up) or negative (down).
shift:bool, whether to use shift+scroll for horizontal scrolling
"""
x, y = self.resize_coordinates(self.coords1)
if shift:
return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.hscroll({clicks})"
else:
return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.vscroll({clicks})"
def hotkey(self, keys: List):
"""Press a hotkey combination
Args:
keys:List the keys to press in combination in a list format (e.g. ['ctrl', 'c'])
"""
# add quotes around the keys
keys = [f"'{key}'" for key in keys]
return f"import pyautogui; pyautogui.hotkey({', '.join(keys)})"
def hold_and_press(self, hold_keys: List, press_keys: List):
"""Hold a list of keys and press a list of keys
Args:
hold_keys:List, list of keys to hold
press_keys:List, list of keys to press in a sequence
"""
press_keys_str = "[" + ", ".join([f"'{key}'" for key in press_keys]) + "]"
command = "import pyautogui; "
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"pyautogui.press({press_keys_str}); "
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
return command
def wait(self, time: float):
"""Wait for a specified amount of time
Args:
time:float the amount of time to wait in seconds
"""
return f"""import time; time.sleep({time})"""
def done(
self,
return_value: Optional[Union[Dict, str, List, Tuple, int, float, bool]] = None,
):
"""End the current task with a success and the required return value"""
self.returned_info = return_value
return """DONE"""
def fail(self):
"""End the current task with a failure, and replan the whole task."""
return """FAIL"""
class DummyAgent:
def __init__(
self,
platform,
):
self.platform = (
platform # Dictates how the switch_applications agent action works.
)
self.width = 1
self.height = 1
self.notes = []
self.coords1 = None
self.coords2 = None
def generate_coords(self, ref_expr: str, obs: Dict) -> List[int]:
return 0,0
def generate_text_coords(
self, phrase: str, obs: Dict, alignment: str = ""
) -> List[int]:
return 0,0
# Takes a description based action and assigns the coordinates for any coordinate based action
# Raises an error if function can't be parsed
def assign_coordinates(self, plan: str, obs: Dict):
# Reset coords from previous action generation
self.coords1, self.coords2 = None, None
try:
# Extract the function name and args
action = parse_single_code_from_string(plan.split("Grounded Action")[-1])
function_name = re.match(r"(\w+\.\w+)\(", action).group(1)
args = self.parse_function_args(action)
except Exception as e:
raise RuntimeError(f"Error in parsing grounded action: {e}") from e
# arg0 is a description
if (
function_name in ["agent.click", "agent.type", "agent.scroll"]
and len(args) >= 1
and args[0] != None
):
self.coords1 = self.generate_coords(args[0], obs)
# arg0 and arg1 are descriptions
elif function_name == "agent.drag_and_drop" and len(args) >= 2:
self.coords1 = self.generate_coords(args[0], obs)
self.coords2 = self.generate_coords(args[1], obs)
# arg0 and arg1 are text phrases
elif function_name == "agent.highlight_text_span" and len(args) >= 2:
self.coords1 = self.generate_text_coords(args[0], obs, alignment="start")
self.coords2 = self.generate_text_coords(args[1], obs, alignment="end")
# Resize from grounding model dim into OSWorld dim (1920 * 1080)
def resize_coordinates(self, coordinates: List[int]) -> List[int]:
return [
round(coordinates[0] * self.width),
round(coordinates[1] * self.height),
]
# Given a generated ACI function, returns a list of argument values, where descriptions are at the front of the list
def parse_function_args(self, function: str) -> List[str]:
tree = ast.parse(function)
call_node = tree.body[0].value
def safe_eval(node):
if isinstance(
node, ast.Constant
): # Handles literals like numbers, strings, etc.
return node.value
else:
return ast.unparse(node) # Return as a string if not a literal
positional_args = [safe_eval(arg) for arg in call_node.args]
keyword_args = {kw.arg: safe_eval(kw.value) for kw in call_node.keywords}
res = []
for key, val in keyword_args.items():
if "description" in key:
res.append(val)
for arg in positional_args:
res.append(arg)
return res
def click(
self,
instruction: str,
num_clicks: int = 1,
button_type: str = "left",
hold_keys: List = [],
):
"""Click on the element
Args:
instruction:str, decribe the element you want to interact with in detail including the visual description and function description. And make it clear and concise. For example you can describe what the element looks like, and what will be the expected result when you interact with it.
num_clicks:int, number of times to click the element
button_type:str, which mouse button to press can be "left", "middle", or "right"
hold_keys:List, list of keys to hold while clicking
"""
x, y = self.resize_coordinates(self.coords1)
command = "import pyautogui; "
# TODO: specified duration?
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"""import pyautogui; pyautogui.click({x}, {y}, clicks={num_clicks}, button={repr(button_type)}); """
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
# Return pyautoguicode to click on the element
return command
def switch_applications(self, app_code):
"""Switch to a different application that is already open
Args:
app_code:str the code name of the application to switch to from the provided list of open applications
"""
if self.platform == "darwin":
return f"import pyautogui; import time; pyautogui.hotkey('command', 'space', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)"
elif self.platform == "linux":
return UBUNTU_APP_SETUP.replace("APP_NAME", app_code)
elif self.platform == "windows":
return f"import pyautogui; import time; pyautogui.hotkey('win', 'd', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)"
def open(self, app_or_filename: str):
"""Open any application or file with name app_or_filename. Use this action to open applications or files on the desktop, do not open manually.
Args:
app_or_filename:str, the name of the application or filename to open
"""
return f"import pyautogui; pyautogui.hotkey('win'); time.sleep(0.5); pyautogui.write({repr(app_or_filename)}); time.sleep(1.0); pyautogui.hotkey('enter'); time.sleep(0.5)"
def type(
self,
element_description: Optional[str] = None,
text: str = "",
overwrite: bool = False,
enter: bool = False,
):
"""Type text into a specific element
Args:
element_description:str, a detailed description of which element to enter text in. This description should be at least a full sentence.
text:str, the text to type
overwrite:bool, Assign it to True if the text should overwrite the existing text, otherwise assign it to False. Using this argument clears all text in an element.
enter:bool, Assign it to True if the enter key should be pressed after typing the text, otherwise assign it to False.
"""
if self.coords1 is not None:
# If a node is found, retrieve its coordinates and size
# Start typing at the center of the element
x, y = self.resize_coordinates(self.coords1)
command = "import pyautogui; "
command += f"pyautogui.click({x}, {y}); "
if overwrite:
command += (
f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); "
)
command += f"pyautogui.write({repr(text)}); "
if enter:
command += "pyautogui.press('enter'); "
else:
# If no element is found, start typing at the current cursor location
command = "import pyautogui; "
if overwrite:
command += (
f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); "
)
command += f"pyautogui.write({repr(text)}); "
if enter:
command += "pyautogui.press('enter'); "
return command
def drag_and_drop(
self, starting_description: str, ending_description: str, hold_keys: List = []
):
"""Drag from the starting description to the ending description
Args:
starting_description:str, a very detailed description of where to start the drag action. This description should be at least a full sentence. And make it clear and concise.
ending_description:str, a very detailed description of where to end the drag action. This description should be at least a full sentence. And make it clear and concise.
hold_keys:List list of keys to hold while dragging
"""
x1, y1 = self.resize_coordinates(self.coords1)
x2, y2 = self.resize_coordinates(self.coords2)
command = "import pyautogui; "
command += f"pyautogui.moveTo({x1}, {y1}); "
# TODO: specified duration?
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); "
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
# Return pyautoguicode to drag and drop the elements
return command
def highlight_text_span(self, starting_phrase: str, ending_phrase: str):
"""Highlight a text span between a provided starting phrase and ending phrase. Use this to highlight words, lines, and paragraphs.
Args:
starting_phrase:str, the phrase that denotes the start of the text span you want to highlight. If you only want to highlight one word, just pass in that single word.
ending_phrase:str, the phrase that denotes the end of the text span you want to highlight. If you only want to highlight one word, just pass in that single word.
"""
x1, y1 = self.coords1
x2, y2 = self.coords2
command = "import pyautogui; "
command += f"pyautogui.moveTo({x1}, {y1}); "
command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); "
# Return pyautoguicode to drag and drop the elements
return command
def set_cell_values(
self, cell_values: Dict[str, Any], app_name: str, sheet_name: str
):
"""Use this to set individual cell values in a spreadsheet. For example, setting A2 to "hello" would be done by passing {"A2": "hello"} as cell_values. The sheet must be opened before this command can be used.
Args:
cell_values: Dict[str, Any], A dictionary of cell values to set in the spreadsheet. The keys are the cell coordinates in the format "A1", "B2", etc.
Supported value types include: float, int, string, bool, formulas.
app_name: str, The name of the spreadsheet application. For example, "Some_sheet.xlsx".
sheet_name: str, The name of the sheet in the spreadsheet. For example, "Sheet1".
"""
return SET_CELL_VALUES_CMD.format(
cell_values=cell_values, app_name=app_name, sheet_name=sheet_name
)
def scroll(self, instruction: str, clicks: int, shift: bool = False):
"""Scroll the element in the specified direction
Args:
instruction:str, a very detailed description of which element to enter scroll in. This description should be at least a full sentence. And make it clear and concise.
clicks:int, the number of clicks to scroll can be positive (up) or negative (down).
shift:bool, whether to use shift+scroll for horizontal scrolling
"""
x, y = self.resize_coordinates(self.coords1)
if shift:
return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.hscroll({clicks})"
else:
return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.vscroll({clicks})"
def hotkey(self, keys: List):
"""Press a hotkey combination
Args:
keys:List the keys to press in combination in a list format (e.g. ['ctrl', 'c'])
"""
# add quotes around the keys
keys = [f"'{key}'" for key in keys]
return f"import pyautogui; pyautogui.hotkey({', '.join(keys)})"
def hold_and_press(self, hold_keys: List, press_keys: List):
"""Hold a list of keys and press a list of keys
Args:
hold_keys:List, list of keys to hold
press_keys:List, list of keys to press in a sequence
"""
press_keys_str = "[" + ", ".join([f"'{key}'" for key in press_keys]) + "]"
command = "import pyautogui; "
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"pyautogui.press({press_keys_str}); "
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
return command
def wait(self, time: float):
"""Wait for a specified amount of time
Args:
time:float the amount of time to wait in seconds
"""
return f"""import time; time.sleep({time})"""
def done(
self,
return_value: Optional[Union[Dict, str, List, Tuple, int, float, bool]] = None,
):
"""End the current task with a success and the required return value"""
self.returned_info = return_value
return """DONE"""
def fail(self):
"""End the current task with a failure, and replan the whole task."""
return """FAIL"""
def run_python(self,code):
return code
def fast_open_terminal(self, *args,**kwargs):
app_or_filename='terminal'
return f"import time; import pyautogui; pyautogui.hotkey('ctrl', 's'); time.sleep(0.5); pyautogui.hotkey('alt', 'f4'); time.sleep(0.5); pyautogui.hotkey('win'); time.sleep(0.5); pyautogui.write({repr(app_or_filename)}); time.sleep(1.0); pyautogui.hotkey('enter'); time.sleep(0.5)"
def call_llm_safe(agent):
'''
functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/utils/common_utils.py#L27
'''
# Retry if fails
attempt = 0
response = ""
while attempt < MAX_RETRY_TIMES:
try:
response = agent.get_response()
break # If successful, break out of the loop
except Exception as e:
attempt += 1
print(f"Attempt {attempt} failed: {e}")
if attempt == MAX_RETRY_TIMES:
print("Max retries reached. Handling failure.")
time.sleep(1.0)
return response
def parse_single_code_from_string(input_string):
'''
functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/utils/common_utils.py#L129
'''
input_string = input_string.strip()
if input_string.strip() in ["WAIT", "DONE", "FAIL"]:
return input_string.strip()
# This regular expression will match both ```code``` and ```python code```
# and capture the `code` part. It uses a non-greedy match for the content inside.
pattern = r"```(?:\w+\s+)?(.*?)```"
# Find all non-overlapping matches in the string
matches = re.findall(pattern, input_string, re.DOTALL)
# The regex above captures the content inside the triple backticks.
# The `re.DOTALL` flag allows the dot `.` to match newline characters as well,
# so the code inside backticks can span multiple lines.
# matches now contains all the captured code snippets
codes = []
for match in matches:
match = match.strip()
commands = [
"WAIT",
"DONE",
"FAIL",
] # fixme: updates this part when we have more commands
if match in commands:
codes.append(match.strip())
elif match.split("\n")[-1] in commands:
if len(match.split("\n")) > 1:
codes.append("\n".join(match.split("\n")[:-1]))
codes.append(match.split("\n")[-1])
else:
codes.append(match)
return codes[0]
agent = OSWorldACI('linux')
class GTA1Agent:
'''
class based on https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/jedi_7b_agent.py
'''
def __init__(
self,
platform="ubuntu",
planner_model="o3",
max_tokens=4096,
top_p=0.9,
temperature= 0.0,
action_space="pyautogui",
observation_type="screenshot",
max_steps=100,
max_image_history_length = 5,
N_SEQ = 8,
client_password="password"
):
self.platform = platform
self.max_tokens = max_tokens
self.top_p = top_p
self.temperature = temperature
self.client_password = client_password
self.action_space = action_space
self.observation_type = observation_type
assert action_space in ["pyautogui"], "Invalid action space"
assert observation_type in ["screenshot"], "Invalid observation type"
self.thoughts = []
self.actions = []
self.observations = []
self.observation_captions = []
self.max_steps = max_steps
self.planner_model=planner_model
self.current_step = 1
self.max_image_history_length = max_image_history_length
self.N_SEQ=N_SEQ
def predict(self, instruction: str, obs: Dict) -> List:
"""
Predict the next action(s) based on the current observation.
"""
user_prompt = (
f"""Please generate the next move according to the UI screenshot and instruction. And you can refer to the previous actions and observations for reflection.\n\nInstruction: {instruction}\n\n""")
system_prompt = GTA1_PLANNER_SYSTEM_PROMPT
messages = [{
"role": "system",
"content": [{
"type": "text",
"text": system_prompt.replace("{current_step}", str(self.current_step)).replace("{max_steps}", str(self.max_steps))
}]
}]
# Determine which observations to include images for (only most recent ones)
obs_start_idx = max(0, len(self.observations) - self.max_image_history_length)
# Add all thought and action history
for i in range(len(self.thoughts)):
# For recent steps, include the actual screenshot
if i >= obs_start_idx:
messages.append({
"role": "user",
"content": [{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{encode_image(self.observations[i]['screenshot'])}",
"detail": "high"
},
}]
})
messages.append({
"role": "user",
"content": [{
"type": "text",
"text": f"Step {i+1} Observation:\n{self.observation_captions[i]}\n"
}]
})
thought_messages = f"Step {i+1} Thought:\n{self.thoughts[i]}"
action_messages = f"Step {i+1} Action:"
for action in self.actions[i]:
action_messages += f"\n{action}"
messages.append({
"role": "assistant",
"content": [{
"type": "text",
"text": thought_messages + "\n" + action_messages
}]
})
messages.append({
"role":"user",
"content": [
{
"type":"image_url",
"image_url":{
"url":f"data:image/png;base64,{encode_image(obs['screenshot'])}",
"detail": "high"
},
},
{
"type": "text",
"text": user_prompt
},
],
})
N = self.N_SEQ
logger.info(f"Executing planning")
planner_response = []
for bn in split_to_batches(N, batch_size=8):
planner_response_ = self.call_llm({
"model": self.planner_model,
"messages": messages,
"n": bn,
"max_completion_tokens": self.max_tokens,
}, self.planner_model)
planner_response.extend(planner_response_)
valid_responses = [response for response in planner_response if self.isvalid(response)]
N = N - len(valid_responses)
planner_response = [response for response in planner_response if not self.isvalid(response)]
if planner_response:
planner_response = planner_response[0]
retry_count = 0
max_retries = 5
while N > 0:
logger.info(f"Executing planning {retry_count}")
if retry_count >= max_retries:
break
messages.append({
"role": "user",
"content": [
{"type": "text", "text": """You didn't generate a valid "Observation:\n(.*?)\n" section, a valid "Thought:\n(.*?)\n" section, or valid actions. Please try again."""} #"You didn't generate valid actions. Please try again."}
]
})
planner_response = []
for bn in split_to_batches(N, batch_size=8):
planner_response_ = self.call_llm({
"model": self.planner_model,
"messages": messages,
"n": bn,
"max_completion_tokens": self.max_tokens * 4,
}, self.planner_model)
planner_response.extend(planner_response_)
valid_responses_ = [response for response in planner_response if self.isvalid(response)]
N = N - len(valid_responses_)
planner_response = [response for response in planner_response if not self.isvalid(response)]
if planner_response:
planner_response = planner_response[0]
valid_responses.extend(valid_responses_)
retry_count += 1
# assert len(valid_responses) > int(self.N_SEQ) * 0.8, f"Not enough valid responses generated {len(valid_responses)}"
logger.info(f"Executing selection")
if self.N_SEQ > 1:
history_cache = [f"Observation:\n{o}\nThought:\n{t}\nAction:\n{a}" for a,t,o in zip(self.actions, self.thoughts, self.observation_captions)]
planner_response = self.select(instruction, Image.open(BytesIO(obs['screenshot'])), valid_responses, history_cache)
else:
planner_response = valid_responses[0]
codes = self.parse_code_from_planner_response(planner_response)
thought = self.parse_thought_from_planner_response(planner_response)
observation_caption = self.parse_observation_caption_from_planner_response(planner_response)
def request_vllm(image, prompt):
if isinstance(image, bytes):
image = np.array(Image.open(BytesIO(image)).convert('RGB'))
H, W, C = image.shape
H, W = smart_resize(
H,
W,
factor=28,
min_pixels=1000,
max_pixels=1000000000000,
)
assert C == 3
if isinstance(image, np.ndarray):
image_base64 = encode_numpy_image_to_base64(image)
elif isinstance(image, bytes):
image_base64 = encode_image_bytes(image)
else:
raise ValueError(f"Invalid image type: {type(image)}")
messages=[
{"role": "system", "content": GTA1_GROUNDING_SYSTEM_PROMPT.format(height=H, width=W)},
{
"role":
"user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{image_base64}"
},
},
],
}]
vllm_client = OpenAI(
base_url=GTA1_SERVICE_URL,
api_key=GTA1_API_KEY,
)
response = vllm_client.chat.completions.create(
model=GTA1_MODEL_NMAE,
messages=messages,
max_tokens=100,
temperature=0,
n=1
)
result = response.choices[0].message.content
matches = re.findall(r"\((-?\d*\.?\d+),\s*(-?\d*\.?\d+)\)", result)
x,y = [tuple(map(int, match)) for match in matches][0]
x = x/W
y = y/H
return x,y
logger.info(f"Executing grounding")
agent.assign_coordinates(planner_response, obs, request_vllm)
plan_code = extract_first_agent_function("\n".join(codes))
pyautogui_actions = [eval(plan_code)]
plan_code = [plan_code]
self.actions.append([plan_code])
self.observations.append(obs)
self.thoughts.append(thought)
self.observation_captions.append(observation_caption)
self.current_step += 1
if self.current_step >= self.max_steps:
pyautogui_actions = ["FAIL"]
return planner_response, pyautogui_actions
def select(self, instruction, screenshot, response, history_cache):
height, width = screenshot.height, screenshot.width
height, width = smart_resize(
height,
width,
factor=28,
min_pixels=1000,
max_pixels=1000000000000,
)
image = screenshot.resize((height, width))
system_promt = GTA1_JUDGE_SYSTEM_PROMPT.format(N_PLANNING=len(response), N_INDEX=len(response)-1,width=width,height=height, CLIENT_PASSWORD=self.client_password)
lines = [
f"The goal of the task is:\n{instruction}",
]
if len(history_cache) == 0:
history_cache = ["No history available. The action just started"]
lines = [
f"The goal of the task is:\n{instruction}",
"Here are the past history:"
]
lines += [
f"### Past step {idx}:\n{step}"
for idx, step in enumerate(history_cache)
]
lines += ["Here are the different plans to compare:"]
lines += [
f"### Index {idx}:\n{plan}"
for idx, plan in enumerate(response)
]
user_message = "\n".join(lines)
messages = [
{
"role": "system",
"content": [{"type": "text", "text": system_promt}]
},
{
"role": "user",
"content": [{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{pil_to_base64(image)}"}}, {"type": "text", "text": user_message}]
}
]
url = "https://api.openai.com/v1/chat/completions"
headers = {"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}", "Content-Type":"application/json"}
payload = {
"model": "o3",
"messages": messages,
"max_completion_tokens": 4096 * 4,
}
wait = 1
for _ in range(MAX_RETRY_TIMES):
try:
prediction = requests.post(url, headers=headers, json=payload, proxies=proxies, timeout=180)
if prediction.status_code != 200:
continue
prediction = prediction.json()['choices'][0]['message']['content']
prediction = extract_answer_from_response(prediction)
return response[prediction['index']]
except:
time.sleep(wait)
wait *=2
wait = min(wait,32)
continue
return response[0]
def isvalid(self,planner_response):
try:
agent.dummy_agent.assign_coordinates(planner_response, {"screenshot": None})
except:
return False
codes = self.parse_code_from_planner_response(planner_response)
try:
test_code = extract_first_agent_function("\n".join(codes))
test_code = "agent.dummy_agent." + test_code[6:]
eval(test_code)
except Exception as e:
#print("Invalid code:", [test_code], str(e), "!!!")
return False
thought = self.parse_thought_from_planner_response(planner_response)
observation_caption = self.parse_observation_caption_from_planner_response(planner_response)
return bool(codes and thought and observation_caption)
def parse_code_from_planner_response(self, input_string: str) -> List[str]:
input_string = "\n".join([line.strip() for line in input_string.split(';') if line.strip()])
pattern = r"```(?:\w+\s+)?(.*?)```"
matches = re.findall(pattern, input_string, re.DOTALL)
codes = []
for match in matches:
match = match.strip()
codes.append(match)
return codes
def unsetonestep(self):
self.actions = self.actions[:-1]
self.observations = self.actions[:-1]
self.thoughts.append = self.actions[:-1]
self.observation_captions = self.actions[:-1]
self.current_step -= 1
def parse_observation_caption_from_planner_response(self, input_string: str) -> str:
pattern = r"Observation:\n(.*?)\n"
matches = re.findall(pattern, input_string, re.DOTALL)
if matches:
return matches[0].strip()
return ""
def parse_thought_from_planner_response(self, input_string: str) -> str:
pattern = r"Thought:\n(.*?)\n"
matches = re.findall(pattern, input_string, re.DOTALL)
if matches:
return matches[0].strip()
return ""
@backoff.on_exception(
backoff.constant,
# here you should add more model exceptions as you want,
# but you are forbidden to add "Exception", that is, a common type of exception
# because we want to catch this kind of Exception in the outside to ensure
# each example won't exceed the time limit
(
# General exceptions
SSLError,
# OpenAI exceptions
openai.RateLimitError,
openai.BadRequestError,
openai.InternalServerError,
# Google exceptions
InvalidArgument,
ResourceExhausted,
InternalServerError,
BadRequest,
# Groq exceptions
# todo: check
),
interval=30,
max_tries=10,
)
def call_llm(self, payload, model):
if model.startswith("gpt") or "o3" in model:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"
}
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
proxies=proxies,
json=payload,
)
#print(response.status_code,"!!!")
#print(response.json(),"!!!")
if response.status_code != 200:
time.sleep(5)
return ""
else:
response = response.json()
return [response["choices"][i]["message"]["content"] for i in range(len(response["choices"]))]
else:
raise SystemExit
def reset(self, _logger=None):
global logger
logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent")
self.thoughts = []
self.action_descriptions = []
self.actions = []
self.observations = []
self.observation_captions = []
self.current_step = 1
def extract_first_agent_function(code_string):
'''
functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/utils/common_utils.py#L189
'''
# Regular expression pattern to match 'agent' functions with any arguments, including nested parentheses
pattern = r'agent\.[a-zA-Z_]+\((?:[^()\'"]|\'[^\']*\'|"[^"]*")*\)'
# Find all matches in the string
matches = re.findall(pattern, code_string)
# Return the first match if found, otherwise return None
return matches[0] if matches else None
def split_to_batches(n, batch_size=8):
batches = [batch_size] * (n // batch_size)
remainder = n % batch_size
if remainder:
batches.append(remainder)
return batches
def extract_answer_from_response(response):
if not response or not isinstance(response, str):
raise ValueError("Response must be a non-empty string")
json_pattern = r'```json\s*(.*?)\s*```'
json_match = re.search(json_pattern, response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
try:
answer = json.loads(json_str)
if "explaining" in answer and "index" in answer:
answer["index"] = int(answer["index"])
return answer
else:
raise ValueError("JSON missing required fields 'explaining' or 'index'")
except json.JSONDecodeError:
pass
direct_json_pattern = r'\{[\s\S]*?"explaining"[\s\S]*?"index"[\s\S]*?\}'
direct_match = re.search(direct_json_pattern, response)
if direct_match:
try:
json_str = direct_match.group(0)
json_str = json_str.replace(''', "'").replace(''', "'").replace('"', '"').replace('"', '"')
answer = json.loads(json_str)
answer["index"] = int(answer["index"])
return answer
except json.JSONDecodeError:
pass
index_pattern = r'"index"\s*:\s*(\d+)'
index_match = re.search(index_pattern, response)
explaining_pattern = r'"explaining"\s*:\s*"(.*?)"(?=,|\s*})'
explaining_match = re.search(explaining_pattern, response, re.DOTALL)
if not explaining_match:
explaining_pattern = r'"explaining"\s*:\s*(.*?)(?=,\s*"index"|\s*})'
explaining_match = re.search(explaining_pattern, response, re.DOTALL)
if index_match and explaining_match:
return {
"index": int(index_match.group(1)),
"explaining": explaining_match.group(1).strip('" \t\n')
}
if index_match:
return {
"index": int(index_match.group(1)),
"explaining": "Explanation not found in response"
}
raise ValueError("Could not extract valid answer from response")
def pil_to_base64(image):
'''
function borrow from https://github.com/xlang-ai/OSWorld/blob/7d0ad02706a7fe742fa1ad6a483782835e3d51e6/mm_agents/uitars_agent.py#L486
'''
buffer = BytesIO()
image.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode("utf-8")
def encode_numpy_image_to_base64(image: np.ndarray) -> str:
"""Converts a numpy array image to base64 string.
Args:
image: Numpy array representing an image (height, width, channels)
Returns:
Base64 encoded string of the image
"""
# Convert numpy array to bytes
success, buffer = cv2.imencode('.png', image)
if not success:
raise ValueError("Failed to encode image to png format")
# Convert bytes to base64 string
image_bytes = buffer.tobytes()
base64_string = base64.b64encode(image_bytes).decode('utf-8')
return base64_string
def encode_image_bytes(image_content):
return base64.b64encode(image_content).decode('utf-8')