mirrored 5 minutes ago
0
Hiroidfix(maestro): Fixed the debug logging level (#334) Co-authored-by: Liangxuan Guo <guoliangxuan@deepmatrix.com.cn>a668670
import argparse
import json
import datetime
import logging
import os
import sys
import time
import traceback
from pathlib import Path
from tqdm import tqdm
from dotenv import load_dotenv
from multiprocessing import Pool, cpu_count
from functools import partial
from desktop_env.desktop_env import DesktopEnv

# Import from local mm_agents/maestro
try:
    from mm_agents.maestro.maestro.controller.main_controller import MainController
    from mm_agents.maestro.utils.analyze_display import analyze_display_json, format_output_line
    from mm_agents.maestro.utils.common_utils import ImageDataFilter, SafeLoggingFilter
except Exception as e:
    raise ImportError(
        f"Failed to import maestro dependencies, please ensure mm_agents/maestro directory exists. Reason: {e}"
    )

# Load .env from mm_agents/maestro directory
CURRENT_FILE = Path(__file__).resolve()
PROJECT_ROOT = CURRENT_FILE.parent
MAESTRO_ENV_PATH = PROJECT_ROOT / "mm_agents" / "maestro" / ".env"
if MAESTRO_ENV_PATH.exists():
    load_dotenv(dotenv_path=MAESTRO_ENV_PATH)

logger = logging.getLogger()
logger.setLevel(logging.INFO)

vm_datetime_str: str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

log_dir = "runtime"
vm_log_dir = os.path.join(log_dir, f"awsrun_{vm_datetime_str}")
os.makedirs(vm_log_dir, exist_ok=True)

file_handler = logging.FileHandler(
    os.path.join(vm_log_dir, "awsrun_normal.log"), encoding="utf-8"
)
debug_handler = logging.FileHandler(
    os.path.join(vm_log_dir, "awsrun_debug.log"), encoding="utf-8"   
)
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(
    os.path.join(vm_log_dir, "awsrun_sdebug.log"), encoding="utf-8"
)

file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.INFO)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.INFO)

# Safe logging filter
safe_filter = SafeLoggingFilter()
debug_handler.addFilter(safe_filter)
sdebug_handler.addFilter(safe_filter)
file_handler.addFilter(safe_filter)
stdout_handler.addFilter(safe_filter)

# Try to filter third-party library logs
try:
    import openai  # noqa: F401
    openai_logger = logging.getLogger('openai')
    openai_logger.addFilter(safe_filter)
    httpx_logger = logging.getLogger('httpx')
    httpx_logger.addFilter(safe_filter)
except Exception:
    pass

if os.getenv('KEEP_IMAGE_LOGS', 'false').lower() != 'true':
    image_filter = ImageDataFilter()
    debug_handler.addFilter(image_filter)
    sdebug_handler.addFilter(image_filter)
    logging.getLogger().info("Image data filtering enabled - image data in debug logs will be filtered")
else:
    logging.getLogger().info("Image data filtering disabled - debug logs will contain complete image data")

logging.getLogger().info("Safe logging filter enabled - prevents format errors from third-party libraries (OpenAI, HTTPX)")

formatter = logging.Formatter(
    fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
)
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)

stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))

logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)

logger = logging.getLogger("desktopenv.experiment")


def config() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Run end-to-end evaluation on the benchmark (maestro integration)"
    )

    current_platform = "Ubuntu"
    test_config_base_dir = os.path.join("evaluation_examples", "examples")
    test_all_meta_path = os.path.join("evaluation_examples", "test_tiny.json")


    # platform config
    parser.add_argument(
        "--current_platform", 
        type=str, 
        choices=["Ubuntu", "Windows"], 
        default=current_platform,
        help="Platform to run on (Ubuntu or Windows)"
    )

    # environment config
    parser.add_argument("--headless", action="store_true", help="Run in headless machine")
    parser.add_argument("--action_space", type=str, default="pyautogui", help="Action type")
    parser.add_argument(
        "--observation_type",
        choices=["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"],
        default="screenshot",
        help="Observation type",
    )
    parser.add_argument("--max_steps", type=int, default=50)

    # agent config
    parser.add_argument("--test_config_base_dir", type=str, default=test_config_base_dir)

    # password config
    parser.add_argument("--password", type=str, default="osworld-public-evaluation", help="Environment password for sudo operations")

    # example config
    parser.add_argument("--domain", type=str, default="all")
    parser.add_argument("--test_all_meta_path", type=str, default=test_all_meta_path)

    # logging related
    parser.add_argument("--result_dir", type=str, default="./results")
    parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to run in parallel")

    args = parser.parse_args()

    # Convert to absolute paths to avoid cwd dependency
    try:
        repo_root = PROJECT_ROOT
        if not os.path.isabs(args.test_config_base_dir):
            args.test_config_base_dir = str((repo_root / args.test_config_base_dir).resolve())
        if not os.path.isabs(args.test_all_meta_path):
            args.test_all_meta_path = str((repo_root / args.test_all_meta_path).resolve())
    except Exception:
        pass

    return args


def process_with_delay_wrapper(task_with_index_and_args):
    task_info, task_index, args, vm_log_dir, base_timestamp = task_with_index_and_args
    time.sleep(task_index * 5)
    return process_single_task(task_info, args, vm_log_dir, base_timestamp, task_index)


def process_single_task_no_delay(task_with_index_and_args):
    """Worker function to process a single task without delay for queue mode"""
    task_info, task_index, args, vm_log_dir, base_timestamp = task_with_index_and_args
    return process_single_task(task_info, args, vm_log_dir, base_timestamp, task_index)


def process_single_task(task_info, args, vm_log_dir, base_timestamp=None, task_index=0):
    """Worker function to process a single task"""
    domain, example_id, config_file = task_info
    
    try:
        with open(config_file, "r", encoding="utf-8") as f:
            example = json.load(f)

        user_query = example["instruction"]
        
        if base_timestamp:
            example_datetime_str = f"{base_timestamp}_{task_index:03d}"
        else:
            example_datetime_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:17]

        example_result_dir = os.path.join(
            args.result_dir,
            args.action_space,
            args.observation_type,
            domain,
            example_id,
        )
        os.makedirs(example_result_dir, exist_ok=True)

        try:
            run_single_example(
                None,
                example,
                user_query,
                args,
                example_result_dir,
                [],  # scores not needed in worker
                vm_log_dir,
                example_datetime_str,
            )
            
        except Exception as e:
            logger.error(f"Exception in {domain}/{example_id}: {e}")
            with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
                f.write(
                    json.dumps(
                        {"Error": f"Time limit exceeded in {domain}/{example_id}"}
                    )
                )
                f.write("\n")
            result = 0.0
        finally:
            # env is created and managed within run_single_example
            pass
            
        
    except Exception as e:
        logger.error(f"Fatal error in task {domain}/{example_id}: {e}")
        traceback.print_exc()
        return domain, example_id, 0.0


def test(args: argparse.Namespace, test_all_meta: dict) -> None:
    scores = []

    logger.info("Args: %s", args)
    cfg_args = {
        "headless": args.headless,
        "action_space": args.action_space,
        "observation_type": args.observation_type,
        "max_steps": args.max_steps,
        "result_dir": args.result_dir,
    }

    # Prepare tasks list
    tasks = []
    for domain in test_all_meta:
        domain_sanitized = str(domain).strip()
        for example_id in test_all_meta[domain]:
            example_id_sanitized = str(example_id).strip()
            config_file = os.path.join(
                args.test_config_base_dir,
                domain_sanitized,
                f"{example_id_sanitized}.json"
            )

            if not os.path.exists(config_file):
                try:
                    candidate_dir = os.path.join(args.test_config_base_dir, domain_sanitized)
                    existing_files = []
                    if os.path.isdir(candidate_dir):
                        existing_files = os.listdir(candidate_dir)
                    logger.error(f"Config file not found: {config_file}")
                    logger.error(f"Existing files in {candidate_dir}: {existing_files}")
                except Exception as e:
                    logger.error(f"Error while listing directory for debug: {e}")
                raise FileNotFoundError(config_file)

            tasks.append((domain_sanitized, example_id_sanitized, config_file))

    if args.num_envs > 1:
        # Parallel processing with task queue - fixed number of workers
        num_workers = args.num_envs
        logger.info(f"Processing {len(tasks)} tasks with {num_workers} workers in queue mode...")
        
        # Process tasks with fixed worker pool - tasks will queue and wait for available workers
        with Pool(processes=num_workers) as pool:
            results = []
            for i, task in enumerate(tasks):
                base_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
                # Add 5 second delay between task submissions
                if i > 0:
                    time.sleep(5)
                
                task_with_args = (task, i, args, vm_log_dir, base_timestamp)
                result = pool.apply_async(process_single_task_no_delay, (task_with_args,))
                results.append(result)
                logger.info(f"Submitted task {i+1}/{len(tasks)}: {task[0]}/{task[1]}")
            
            # Wait for all tasks to complete
            final_results = [result.get() for result in results]
            
    else:
        # Sequential processing (original logic)
        for domain, example_id, config_file in tqdm(tasks, desc="Processing tasks"):
            logger.info(f"[Domain]: {domain}")
            logger.info(f"[Example ID]: {example_id}")

            with open(config_file, "r", encoding="utf-8") as f:
                example = json.load(f)

            user_query = example["instruction"]
            logger.info(f"[User Query]: {user_query}")
            
            cfg_args["user_query"] = user_query
            cfg_args["start_time"] = datetime.datetime.now().strftime(
                "%Y:%m:%d-%H:%M:%S"
            )

            example_datetime_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

            example_result_dir = os.path.join(
                args.result_dir,
                args.action_space,
                args.observation_type,
                domain,
                example_id,
            )
            os.makedirs(example_result_dir, exist_ok=True)

            try:
                run_single_example(
                    None,  # env will be created in run_single_example for sequential mode
                    example,
                    user_query,
                    args,
                    example_result_dir,
                    scores,
                    vm_log_dir,
                    example_datetime_str,
                )
            except Exception as e:
                logger.error(f"Exception in {domain}/{example_id}: {e}")
                # Note: env creation moved to run_single_example for sequential mode


def run_single_example(
    env: DesktopEnv | None,
    example,
    user_query: str,
    args,
    example_result_dir,
    scores,
    vm_log_dir: str,
    example_datetime_str: str,
):
    example_timestamp_dir = os.path.join(vm_log_dir, example_datetime_str)
    total_start_time = time.time()
    cache_dir = os.path.join(example_timestamp_dir, "cache", "screens")
    state_dir = os.path.join(example_timestamp_dir, "state")

    os.makedirs(cache_dir, exist_ok=True)
    os.makedirs(state_dir, exist_ok=True)

    example_logger = setup_example_logger(example, example_timestamp_dir)
    example_logger.info(f"Starting example {example.get('id', 'unknown')}")
    example_logger.info(f"User Query: {user_query}")
    
    # Create environment if not provided (for sequential mode)
    if env is None:
        # Read proxy setting from example config, default to False if not specified
        enable_proxy = example.get("proxy", False)
        logger.info(f"Proxy status: {enable_proxy}")
        env = DesktopEnv(
            provider_name="aws",
            region="us-east-1",
            action_space=args.action_space,
            headless=args.headless,
            require_a11y_tree=False,
            enable_proxy=enable_proxy
        )
    
    env.reset(task_config=example)

    controller = MainController(
        platform=args.current_platform,
        backend="pyautogui_vmware",
        user_query=user_query,
        max_steps=args.max_steps,
        env=env,
        env_password=args.password,
        log_dir=vm_log_dir,
        datetime_str=example_datetime_str,
    )

    try:
        controller.execute_main_loop()
        task = controller.global_state.get_task()
        if task and task.status == "fulfilled":
            logger.info("Task completed successfully")
            env.step("DONE")
        elif task and task.status == "rejected":
            logger.info("Task was rejected/failed")
            env.step("FAIL")
        else:
            logger.info("Task execution completed with unknown status")
            env.step("DONE")
        
        # Retry mechanism for evaluate method
        max_retries = 3
        retry_delay = 5  # seconds
        result = 0
        
        for attempt in range(max_retries):
            try:
                result = env.evaluate()
                logger.info("Result: %.2f", result)
                example_logger.info("Result: %.2f", result)
                example_logger.info(f"Example {example.get('id', 'unknown')} completed with result: {result}")
                break  # Success, exit retry loop
            except Exception as e:
                logger.warning(f"Evaluate attempt {attempt + 1}/{max_retries} failed: {e}")
                if attempt < max_retries - 1:  # Not the last attempt
                    logger.info(f"Waiting {retry_delay} seconds before retry...")
                    time.sleep(retry_delay)
                else:
                    logger.error("All evaluate attempts failed, setting result to 0")
                    result = 0
                    example_logger.info("Result: %.2f", result)
                    example_logger.info(f"Example {example.get('id', 'unknown')} completed with result: {result} (after failed retries)")
        scores.append(result)
        with open(os.path.join(example_result_dir, "result.txt"), "w", encoding="utf-8") as f:
            f.write(f"{result}\n")

    except Exception as e:
        logger.error(f"Error during maestro execution: {e}")
        raise
    finally:
        total_end_time = time.time()
        total_duration = total_end_time - total_start_time
        logger.info(f"Total execution time: {total_duration:.2f} seconds")
        auto_analyze_execution(example_timestamp_dir)
        
        env.close()


def auto_analyze_execution(timestamp_dir: str):
    import time as _t
    try:
        display_json_path = os.path.join(timestamp_dir, "display.json")
        max_wait_time = 10
        wait_interval = 0.5
        waited_time = 0
        while waited_time < max_wait_time:
            if os.path.exists(display_json_path):
                try:
                    size1 = os.path.getsize(display_json_path)
                    _t.sleep(wait_interval)
                    size2 = os.path.getsize(display_json_path)
                    if size1 == size2:
                        logger.info(f"Display.json file appears to be complete (size: {size1} bytes)")
                        break
                    else:
                        logger.info(f"Display.json file still being written (size changed from {size1} to {size2} bytes)")
                        waited_time += wait_interval
                        continue
                except OSError:
                    _t.sleep(wait_interval)
                    waited_time += wait_interval
                    continue
            else:
                logger.info(f"Waiting for display.json file to be created... ({waited_time:.1f}s)")
                _t.sleep(wait_interval)
                waited_time += wait_interval
        if os.path.exists(display_json_path):
            logger.info(f"Auto-analyzing execution statistics from: {display_json_path}")
            result = analyze_display_json(display_json_path)
            if result:
                output_line = format_output_line(result)
                logger.info("=" * 80)
                logger.info("EXECUTION STATISTICS:")
                logger.info("Steps, Duration (seconds), (Input Tokens, Output Tokens, Total Tokens), Cost")
                logger.info("=" * 80)
                logger.info(output_line)
                logger.info("=" * 80)
            else:
                logger.warning("No valid data found in display.json for analysis")
        else:
            logger.warning(f"Display.json file not found at: {display_json_path} after waiting {max_wait_time} seconds")
    except Exception as e:
        logger.error(f"Error during auto-analysis: {e}")


def setup_example_logger(example, example_timestamp_dir):
    example_id = example.get('id', 'unknown')
    example_logger = logging.getLogger(f"example.{example_id}.{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}")
    example_logger.setLevel(logging.DEBUG)

    example_logger.handlers.clear()

    log_file = os.path.join(example_timestamp_dir, "example.log")
    file_handler = logging.FileHandler(log_file, encoding="utf-8")
    file_handler.setLevel(logging.DEBUG)

    debug_log_file = os.path.join(example_timestamp_dir, "example_debug.log")
    debug_handler = logging.FileHandler(debug_log_file, encoding="utf-8")
    debug_handler.setLevel(logging.DEBUG)

    formatter = logging.Formatter(
        fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
    )
    file_handler.setFormatter(formatter)
    debug_handler.setFormatter(formatter)

    example_logger.addHandler(file_handler)
    example_logger.addHandler(debug_handler)

    return example_logger

def get_unfinished(
    action_space, observation_type, result_dir, total_file_json
):
    target_dir = os.path.join(result_dir, action_space, observation_type)

    if not os.path.exists(target_dir):
        return total_file_json

    finished = {}
    for domain in os.listdir(target_dir):
        finished[domain] = []
        domain_path = os.path.join(target_dir, domain)
        if os.path.isdir(domain_path):
            for example_id in os.listdir(domain_path):
                if example_id == "onboard":
                    continue
                example_path = os.path.join(domain_path, example_id)
                if os.path.isdir(example_path):
                    if "result.txt" not in os.listdir(example_path):
                        # empty all files under example_id
                        for file in os.listdir(example_path):
                            os.remove(os.path.join(example_path, file))
                    else:
                        finished[domain].append(example_id)

    if not finished:
        return total_file_json

    for domain, examples in finished.items():
        if domain in total_file_json:
            total_file_json[domain] = [
                x for x in total_file_json[domain] if x not in examples
            ]

    return total_file_json

if __name__ == "__main__":
    """
    xvfb-run -a python run_maestro.py --test_all_meta_path evaluation_examples/test_nogdrive.json --num_envs 15
    """

    os.environ["TOKENIZERS_PARALLELISM"] = "false"
    args = config()

    with open(args.test_all_meta_path, "r", encoding="utf-8") as f:
        test_all_meta = json.load(f)

    if args.domain != "all":
        test_all_meta = {args.domain: test_all_meta[args.domain]}

    test_file_list = get_unfinished(
        args.action_space,
        args.observation_type,
        args.result_dir,
        test_all_meta,
    )

    left_info = ""
    for domain in test_file_list:
        left_info += f"{domain}: {len(test_file_list[domain])}\n"
    logger.info(f"Left tasks:\n{left_info}")

    test(args, test_file_list)