/
OS-World9f97535import base64
import logging
import time
from typing import Dict, List, Tuple, Any, Optional
import httpx
logger = logging.getLogger("desktopenv.agent")
class Timer:
"""Context manager for timing code blocks."""
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *args):
self.duration = time.time() - self.start
class AGIAgent:
"""Agent that communicates with your private AGI server for decision-making."""
def __init__(
self,
env,
server_url: str = "https://your-private-agi-endpoint", # Contact the authors for access to a private deployment endpoint.
platform: str = "ubuntu",
action_space: str = "pyautogui",
observation_type: str = "screenshot",
max_trajectory_length: int = 100,
client_password: str = "",
provider_name: str = "aws",
screen_width: int = 1920,
screen_height: int = 1080,
timeout: int = 1800,
):
"""Initialize the AGI client.
Args:
env: The desktop environment
server_url: URL of your private AGI server
"""
self.env = env
self.server_url = server_url.rstrip("/")
self.platform = platform
self.action_space = action_space
self.observation_type = observation_type
self.max_trajectory_length = max_trajectory_length
self.client_password = client_password
self.provider_name = provider_name
self.screen_width = screen_width
self.screen_height = screen_height
# Session management
self.session_id: Optional[str] = None
self.instruction: Optional[str] = None
# HTTP client
self.client = httpx.Client(timeout=timeout)
# Tracking
self.thoughts = []
self.actions = []
self.observations = []
logger.info(f"Initialized AGIAgent with server URL: {self.server_url}")
def reset(self, runtime_logger=None):
"""Reset the agent and create a new session on the server.
Args:
runtime_logger: Optional logger for runtime information
"""
global logger
logger = runtime_logger if runtime_logger is not None else logging.getLogger("desktopenv.agent")
# Clear local state
self.thoughts = []
self.actions = []
self.observations = []
self.session_id = None
logger.info("AGIAgent reset complete")
def _create_session(self, instruction: str) -> str:
"""Create a new session on the server.
Args:
instruction: The task instruction
Returns:
The session ID
Equivalent curl request:
curl -X POST {server_url}/sessions \
-H "Content-Type: application/json" \
-d '{"task_description": "{instruction}"}'
"""
try:
# print(f"Creating session with instruction: {instruction}")
# print(f"Server URL: {self.server_url}")
response = self.client.post(
f"{self.server_url}/sessions",
json={"task_description": instruction}
)
response.raise_for_status()
session_id = response.json()["session_id"]
logger.info(f"Created session: {session_id}")
return session_id
except Exception as e:
logger.error(f"Failed to create session: {e}")
raise
def predict(self, instruction: str, obs: Dict) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""Predict the next action based on the current observation.
Args:
instruction: The task instruction
obs: Observation dictionary containing 'screenshot' key with image bytes
Returns:
Tuple of (predict_info dict, list of action dicts)
"""
# Create session on first prediction
if self.session_id is None:
self.instruction = instruction
self.session_id = self._create_session(instruction)
# input("Session created, press Enter to continue")
# Encode screenshot to base64
screenshot_bytes = obs["screenshot"]
screenshot_b64 = base64.b64encode(screenshot_bytes).decode("utf-8")
# Call the server
with Timer() as model_timer:
try:
response = self.client.post(
f"{self.server_url}/sessions/{self.session_id}/step",
json={
"screenshot_base64_png": screenshot_b64,
"error": None # Could be populated from previous step errors
}
)
response.raise_for_status()
result = response.json()
parsed_action = result["parsed_response"]
logger.info(f"Server returned action: {parsed_action[:100]}...")
except Exception as e:
logger.error(f"Error calling server: {e}")
raise
# Format response as expected by lib_run_single
actions = [{
"action_space": "pyautogui",
"action": parsed_action,
"pending_checks": [],
"call_id": ""
}]
# Check if task is complete or failed
state_correct = parsed_action not in ["FAIL", "DONE"]
predict_info = {
"model_usage": {
"model_time": model_timer.duration,
"prompt_tokens": 0, # Server doesn't expose these
"completion_tokens": 0,
},
"messages": [], # Server manages conversation history
"response": parsed_action,
"state_correct": state_correct,
}
return predict_info, actions
def step(self, action: Dict[str, Any]) -> Tuple[Dict, float, bool, Dict, Dict]:
"""Execute an action in the environment.
Args:
action: Action dictionary with 'action' key containing PyAutoGUI command
Returns:
Tuple of (observation, reward, done, info, step_info)
"""
try:
if not action:
logger.warning("Empty action received, terminating episode")
# Get observation without executing action
obs = self.env._get_obs()
return obs, 0.0, True, {}, {"step_time": 0.0, "action": action}
action_str = action.get("action", "")
logger.info(f"Executing action: {action_str[:100]}...")
with Timer() as step_timer:
# Execute the action directly (it's already a PyAutoGUI command string)
obs, reward, terminated, info = self.env.step(action_str)
logger.debug(f"Action completed in {step_timer.duration:.2f}s")
if terminated:
logger.info("Environment signaled termination")
return obs, reward, terminated, info, {
"step_time": step_timer.duration,
"action": action
}
except Exception as e:
logger.exception(f"Environment step failed: {str(e)}")
raise
def close(self):
"""Close the HTTP client."""
self.client.close()