/
OS-Worlda4f8fe2"""Script to run end-to-end evaluation on the benchmark.
Utils and basic architecture credit to https://github.com/web-arena-x/webarena/blob/main/run.py.
"""
import argparse
import datetime
import json
import logging
import os
import sys
import math
import ast
import time
import backoff
import httpx
import requests
from tqdm import tqdm
from typing import Optional, Dict, Any
from multiprocessing import Pool
from openai import APIConnectionError, APIError, RateLimitError
from types import SimpleNamespace
import lib_run_single
from run_autoglm_v import DesktopEnv, get_unfinished, get_result
from desktop_env.desktop_env import MAX_RETRIES, DesktopEnv as DesktopEnvBase
from mm_agents.autoglm_v import AutoGLMAgent
from openai import OpenAI
logger = logging.getLogger("desktopenv.experiment")
def config() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Run end-to-end evaluation on the benchmark")
    # environment config
    parser.add_argument("--path_to_vm", type=str)
    parser.add_argument(
        "--provider_name",
        type=str,
        default="docker",
        help="Virtualization provider (vmware, docker, aws, azure, gcp, virtualbox)",
    )
    parser.add_argument("--headless", action="store_true", default=True, help="Run in headless machine")
    parser.add_argument("--action_space", type=str, default="autoglm_computer_use", help="Action type")
    parser.add_argument(
        "--observation_type",
        choices=["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"],
        default="a11y_tree",
        help="Observation type",
    )
    parser.add_argument("--screen_width", type=int, default=1920)
    parser.add_argument("--screen_height", type=int, default=1080)
    parser.add_argument("--sleep_after_execution", type=float, default=1.0)
    parser.add_argument("--max_steps", type=int, default=30)
    # agent config
    parser.add_argument("--max_trajectory_length", type=int, default=3)
    parser.add_argument("--test_config_base_dir", type=str, default="evaluation_examples/examples")
    # lm config
    parser.add_argument("--model", type=str, default="autoglm-os")
    parser.add_argument("--temperature", type=float, default=0.4)
    parser.add_argument("--top_p", type=float, default=0.5)
    parser.add_argument("--max_tokens", type=int, default=2048)
    parser.add_argument("--stop_token", type=str, default=None)
    parser.add_argument("--image_width", type=int, default=1280)
    parser.add_argument("--image_height", type=int, default=720)
    # example config
    parser.add_argument("--domain", type=str, default="all")
    parser.add_argument("--test_all_meta_path", type=str, default="evaluation_examples/test_nogdrive.json")
    # aws config
    parser.add_argument(
        "--region", type=str, default="us-east-1", help="AWS region for the VM"
    )
    parser.add_argument("--client_password", type=str, default="", help="Client password")
    # logging related
    parser.add_argument("--result_dir", type=str, default="./results")
    
    # parallel number
    parser.add_argument("--num_workers", type=int, default=20, help="Number of parallel workers")
    args = parser.parse_args()
    return args
def _worker_run(task):
    domain, example_id, args = task  # args 为 argparse.Namespace
    logger = logging.getLogger("desktopenv.experiment")
    try:
        config_file = os.path.join(args.test_config_base_dir, f"{domain}/{example_id}.json")
        with open(config_file, "r", encoding="utf-8") as f:
            example = json.load(f)
        instruction = example["instruction"]
        @backoff.on_exception(backoff.constant, (RateLimitError, APIConnectionError), interval=0.1)
        def call_llm(messages):
            logger.info("Calling LLM...")
            
            # Prepare the request data
            data = {
                "model": args.model,
                "messages": messages,
                "max_tokens": args.max_tokens,
                "temperature": args.temperature,
                "top_p": args.top_p,
                "skip_special_tokens": False,
                "stream": False,
                "include_stop_str_in_output": True,
                "stop": ["<|user|>", "<|observation|>", "</answer>"]
            }
            
            # Set up proxy
            # if os.environ.get('LAN_PROXY', None):
            #     proxies = {
            #         "http": os.environ.get('LAN_PROXY'),
            #         "https": os.environ.get('LAN_PROXY')
            #     }
            # else:
            #     proxies = None
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY', '')}"
            }
            
            # Get API base URL from environment or use default
            base_url = os.environ.get('OPENAI_BASE_URL', 'https://api.openai.com/v1')
            url = f"{base_url}/chat/completions"
            
            response = requests.post(
                url,
                json=data,
                headers=headers,
                # proxies=proxies,
                timeout=60.0
            )
            response.raise_for_status()
            
            result = response.json()
            logger.info("LLM called successfully.")
            return result['choices'][0]['message']['content']
        env = DesktopEnv(
            provider_name=args.provider_name,
            region=args.region,
            client_password=args.client_password,
            path_to_vm=args.path_to_vm,
            action_space=args.action_space,
            screen_size=(args.screen_width, args.screen_height),
            headless=args.headless,
            os_type="Ubuntu",
            require_a11y_tree=args.observation_type in ["a11y_tree", "screenshot_a11y_tree", "som"],
        )
        agent = AutoGLMAgent(
            action_space=args.action_space,
            observation_type=args.observation_type,
            screen_size=(args.screen_width, args.screen_height),
            image_size=(args.image_width, args.image_height),
            max_trajectory_length=args.max_trajectory_length,
            client_password=args.client_password,
            gen_func=call_llm,
        )
        example_result_dir = os.path.join(
            args.result_dir,
            args.action_space,
            args.observation_type,
            args.model,
            domain,
            example_id,
        )
        os.makedirs(example_result_dir, exist_ok=True)
        local_scores = []
        try:
            lib_run_single.run_single_example_autoglm(
                agent,
                env,
                example,
                args.max_steps,
                instruction,
                args,
                example_result_dir,
                local_scores,
            )
        except Exception as e:
            logger.error(f"[并发任务异常] {domain}/{example_id}: {e}")
            if hasattr(env, "controller") and env.controller is not None:
                try:
                    env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4"))
                except Exception:
                    pass
            with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
                f.write(json.dumps({"Error": f"Exception in {domain}/{example_id}: {str(e)}"}) + "\n")
        finally:
            try:
                env.close()
            except Exception:
                pass
        score = None
        result_path = os.path.join(example_result_dir, "result.txt")
        if os.path.exists(result_path):
            try:
                with open(result_path, "r") as rf:
                    res = rf.read().strip()
                    if res.lower() == "true":
                        score = 1.0
                    else:
                        score = float(res)
            except Exception:
                score = 0.0
        else:
            score = 0.0
        logger.info(f"[Finish] {domain}/{example_id} score={score}")
        return (domain, example_id, score)
    except Exception as e:
        logger = logging.getLogger("desktopenv.experiment")
        logger.error(f"[Initializing Fail] {domain}/{example_id}: {e}")
        return (domain, example_id, 0.0)
def test_parallel(args: argparse.Namespace, test_all_meta: dict):
    tasks = []
    for domain in test_all_meta:
        for example_id in test_all_meta[domain]:
            tasks.append((domain, example_id, args))
    if not tasks:
        logger.info("No pending tasks")
        return
    logger.info(f"Starting parallel execution: {args.num_workers} processes, {len(tasks)} tasks total")
    results = []
    with Pool(processes=args.num_workers) as pool:
        for res in tqdm(pool.imap_unordered(_worker_run, tasks), total=len(tasks), desc="Parallel execution"):
            results.append(res)
    scores = [s for (_, _, s) in results if s is not None]
    if scores:
        avg = sum(scores) / len(scores)
        logger.info(f"Parallel execution completed. Average score: {avg}")
    else:
        logger.info("No scores obtained.")
if __name__ == "__main__":
    ####### The complete version of the list of examples #######
    os.environ["TOKENIZERS_PARALLELISM"] = "false"
    args = config()
    if args.client_password == "":
        if args.provider_name == "aws":
            args.client_password = "osworld-public-evaluation"
        else:
            args.client_password = "password"
    else:
        args.client_password = args.client_password
    # save args to json in result_dir/action_space/observation_type/model/args.json
    path_to_args = os.path.join(
        args.result_dir,
        args.action_space,
        args.observation_type,
        args.model,
        "args.json",
    )
    os.makedirs(os.path.dirname(path_to_args), exist_ok=True)
    with open(path_to_args, "w", encoding="utf-8") as f:
        json.dump(vars(args), f, indent=4)
    with open(args.test_all_meta_path, "r", encoding="utf-8") as f:
        test_all_meta = json.load(f)
    if args.domain != "all":
        test_all_meta = {args.domain: test_all_meta[args.domain]}
    test_file_list = get_unfinished(
        args.action_space,
        args.model,
        args.observation_type,
        args.result_dir,
        test_all_meta,
    )
    left_info = ""
    for domain in test_file_list:
        left_info += f"{domain}: {len(test_file_list[domain])}\n"
    logger.info(f"Left tasks:\n{left_info}")
    get_result(
        args.action_space,
        args.model,
        args.observation_type,
        args.result_dir,
        test_all_meta,
    )
    test_parallel(args, test_file_list)