/
OS-World84f407a
import base64
import json
import logging
import os
import time
from typing import Any, Dict, List, Tuple
import openai
from desktop_env.desktop_env import DesktopEnv
from openai import OpenAI # pip install --upgrade openai>=1.30
logger = logging.getLogger("desktopenv")
GPT4O_INPUT_PRICE_PER_1M_TOKENS = 3.00
GPT4O_OUTPUT_PRICE_PER_1M_TOKENS = 12.00
PROMPT_TEMPLATE = """# Task
{instruction}
# Hints
- Sudo password is "{CLIENT_PASSWORD}".
- If you meet "Authentication required" dialog, enter the "{CLIENT_PASSWORD}" to continue.
- Do not close the any application or window or tab that is already opened.
- Do not close the window at the end of the task.
- If you have completed the user task, reply with the information you want the user to know along with 'TERMINATE'.
""".strip()
DEFAULT_REPLY = "Please continue the user task. If you have completed the user task, reply with the information you want the user to know along with 'TERMINATE'."
def _cua_to_pyautogui(action) -> str:
"""Convert an Action (dict **or** Pydantic model) into a pyautogui call."""
def fld(key: str, default: Any = None) -> Any:
return action.get(key, default) if isinstance(action, dict) else getattr(action, key, default)
act_type = fld("type")
if not isinstance(act_type, str):
act_type = str(act_type).split(".")[-1]
act_type = act_type.lower()
if act_type in ["click", "double_click"]:
button = fld('button', 'left')
if button == 1 or button == 'left':
button = 'left'
elif button == 2 or button == 'middle':
button = 'middle'
elif button == 3 or button == 'right':
button = 'right'
if act_type == "click":
return f"pyautogui.click({fld('x')}, {fld('y')}, button='{button}')"
if act_type == "double_click":
return f"pyautogui.doubleClick({fld('x')}, {fld('y')}, button='{button}')"
if act_type == "scroll":
cmd = ""
if fld('scroll_y', 0) != 0:
cmd += f"pyautogui.scroll({-fld('scroll_y', 0) / 100}, x={fld('x', 0)}, y={fld('y', 0)});"
return cmd
if act_type == "drag":
path = fld('path', [{"x": 0, "y": 0}, {"x": 0, "y": 0}])
cmd = f"pyautogui.moveTo({path[0]['x']}, {path[0]['y']}, _pause=False); "
cmd += f"pyautogui.dragTo({path[1]['x']}, {path[1]['y']}, duration=0.5, button='left')"
return cmd
if act_type == 'move':
return f"pyautogui.moveTo({fld('x')}, {fld('y')})"
if act_type == "keypress":
keys = fld("keys", []) or [fld("key")]
if len(keys) == 1:
return f"pyautogui.press('{keys[0].lower()}')"
else:
return "pyautogui.hotkey('{}')".format("', '".join(keys)).lower()
if act_type == "type":
text = str(fld("text", ""))
return "pyautogui.typewrite({:})".format(repr(text))
if act_type == "wait":
return "WAIT"
return "WAIT" # fallback
def _to_input_items(output_items: list) -> list:
"""
Convert `response.output` into the JSON-serialisable items we're allowed
to resend in the next request. We drop anything the CUA schema doesn't
recognise (e.g. `status`, `id`, …) and cap history length.
"""
cleaned: List[Dict[str, Any]] = []
for item in output_items:
raw: Dict[str, Any] = item if isinstance(item, dict) else item.model_dump()
# ---- strip noisy / disallowed keys ---------------------------------
raw.pop("status", None)
cleaned.append(raw)
return cleaned # keep just the most recent 50 items
def call_openai_cua(client: OpenAI,
history_inputs: list,
screen_width: int = 1920,
screen_height: int = 1080,
environment: str = "linux") -> Tuple[Any, float]:
retry = 0
response = None
while retry < 3:
try:
response = client.responses.create(
model="computer-use-preview",
tools=[{
"type": "computer_use_preview",
"display_width": screen_width,
"display_height": screen_height,
"environment": environment,
}],
input=history_inputs,
reasoning={"summary": "concise"},
tool_choice="required",
truncation="auto",
)
break
except openai.BadRequestError as e:
retry += 1
logger.error(f"Error in response.create: {e}")
time.sleep(0.5)
except openai.InternalServerError as e:
retry += 1
logger.error(f"Error in response.create: {e}")
time.sleep(0.5)
if retry == 3:
raise Exception("Failed to call OpenAI.")
cost = 0.0
if response and hasattr(response, "usage") and response.usage:
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
input_cost = (input_tokens / 1_000_000) * GPT4O_INPUT_PRICE_PER_1M_TOKENS
output_cost = (output_tokens / 1_000_000) * GPT4O_OUTPUT_PRICE_PER_1M_TOKENS
cost = input_cost + output_cost
return response, cost
def run_cua(
env: DesktopEnv,
instruction: str,
max_steps: int,
save_path: str = './',
screen_width: int = 1920,
screen_height: int = 1080,
sleep_after_execution: float = 0.3,
truncate_history_inputs: int = 100,
client_password: str = "",
) -> Tuple[str, float]:
client = OpenAI()
# 0 / reset & first screenshot
logger.info(f"Instruction: {instruction}")
obs = env.controller.get_screenshot()
screenshot_b64 = base64.b64encode(obs).decode("utf-8")
with open(os.path.join(save_path, "initial_screenshot.png"), "wb") as f:
f.write(obs)
history_inputs = [{
"role": "user",
"content": [
{"type": "input_text", "text": PROMPT_TEMPLATE.format(instruction=instruction, CLIENT_PASSWORD=client_password)},
{"type": "input_image", "image_url": f"data:image/png;base64,{screenshot_b64}"},
],
}]
response, cost = call_openai_cua(client, history_inputs, screen_width, screen_height)
total_cost = cost
logger.info(f"Cost: ${cost:.6f} | Total Cost: ${total_cost:.6f}")
step_no = 0
reasoning_list = []
reasoning = ""
# 1 / iterative dialogue
while step_no < max_steps:
step_no += 1
history_inputs += _to_input_items(response.output)
# --- robustly pull out computer_call(s) ------------------------------
calls: List[Dict[str, Any]] = []
# completed = False
breakflag = False
for i, o in enumerate(response.output):
typ = o["type"] if isinstance(o, dict) else getattr(o, "type", None)
if not isinstance(typ, str):
typ = str(typ).split(".")[-1]
if typ == "computer_call":
calls.append(o if isinstance(o, dict) else o.model_dump())
elif typ == "reasoning" and len(o.summary) > 0:
reasoning = o.summary[0].text
reasoning_list.append(reasoning)
logger.info(f"[Reasoning]: {reasoning}")
elif typ == 'message':
if 'TERMINATE' in o.content[0].text:
reasoning_list.append(f"Final output: {o.content[0].text}")
reasoning = "My thinking process\n" + "\n- ".join(reasoning_list) + '\nPlease check the screenshot and see if it fulfills your requirements.'
breakflag = True
break
try:
json.loads(o.content[0].text)
history_inputs.pop(len(history_inputs) - len(response.output) + i)
step_no -= 1
except Exception as e:
logger.info(f"[Message]: {o.content[0].text}")
if '?' in o.content[0].text:
history_inputs += [{
"role": "user",
"content": [
{"type": "input_text", "text": DEFAULT_REPLY},
],
}]
elif "{" in o.content[0].text and "}" in o.content[0].text:
history_inputs.pop(len(history_inputs) - len(response.output) + i)
step_no -= 1
else:
logger.info(f"[Message]: {o.content[0].text}")
history_inputs.pop(len(history_inputs) - len(response.output) + i)
reasoning = o.content[0].text
reasoning_list.append(reasoning)
step_no -= 1
if breakflag:
break
for action_call in calls:
py_cmd = _cua_to_pyautogui(action_call["action"])
# --- execute in VM ---------------------------------------------------
obs, *_ = env.step(py_cmd, sleep_after_execution)
# --- send screenshot back -------------------------------------------
screenshot_b64 = base64.b64encode(obs["screenshot"]).decode("utf-8")
with open(os.path.join(save_path, f"step_{step_no}.png"), "wb") as f:
f.write(obs["screenshot"])
history_inputs += [{
"type": "computer_call_output",
"call_id": action_call["call_id"],
"output": {
"type": "computer_screenshot",
"image_url": f"data:image/png;base64,{screenshot_b64}",
},
}]
if "pending_safety_checks" in action_call and len(action_call.get("pending_safety_checks", [])) > 0:
history_inputs[-1]['acknowledged_safety_checks'] = [
{
"id": psc["id"],
"code": psc["code"],
"message": "Please acknowledge this warning if you'd like to proceed."
}
for psc in action_call.get("pending_safety_checks", [])
]
# truncate history inputs while preserving call_id pairs
if len(history_inputs) > truncate_history_inputs:
original_history = history_inputs[:]
history_inputs = [history_inputs[0]] + history_inputs[-truncate_history_inputs:]
# Find all call_ids in the truncated history
call_ids_in_truncated = set()
for item in history_inputs:
if isinstance(item, dict) and 'call_id' in item:
call_ids_in_truncated.add(item['call_id'])
# Check if any call_ids are missing their pairs
call_id_types = {} # call_id -> list of types that reference it
for item in history_inputs:
if isinstance(item, dict) and 'call_id' in item:
call_id = item['call_id']
item_type = item.get('type', '')
if call_id not in call_id_types:
call_id_types[call_id] = []
call_id_types[call_id].append(item_type)
# Find unpaired call_ids (should have both computer_call and computer_call_output)
unpaired_call_ids = []
for call_id, types in call_id_types.items():
# Check if we have both call and output
has_call = 'computer_call' in types
has_output = 'computer_call_output' in types
if not (has_call and has_output):
unpaired_call_ids.append(call_id)
# Add missing pairs from original history while preserving order
if unpaired_call_ids:
# Find missing paired items in their original order
missing_items = []
for item in original_history:
if (isinstance(item, dict) and
item.get('call_id') in unpaired_call_ids and
item not in history_inputs):
missing_items.append(item)
# Insert missing items back, preserving their original order
# We need to find appropriate insertion points to maintain chronology
for missing_item in missing_items:
# Find the best insertion point based on original history order
original_index = original_history.index(missing_item)
# Find insertion point in truncated history
insert_pos = len(history_inputs) # default to end
for i, existing_item in enumerate(history_inputs[1:], 1): # skip first item (initial prompt)
if existing_item in original_history:
existing_original_index = original_history.index(existing_item)
if existing_original_index > original_index:
insert_pos = i
break
history_inputs.insert(insert_pos, missing_item)
response, cost = call_openai_cua(client, history_inputs, screen_width, screen_height)
total_cost += cost
logger.info(f"Cost: ${cost:.6f} | Total Cost: ${total_cost:.6f}")
logger.info(f"Total cost for the task: ${total_cost:.4f}")
history_inputs[0]['content'][1]['image_url'] = "<image>"
for item in history_inputs:
if item.get('type', None) == 'computer_call_output':
item['output']['image_url'] = "<image>"
return history_inputs, reasoning, total_cost