mirrored 2 minutes ago
0
HiroidAdd multiple new modules and tools to enhance the functionality and extensibility of the Maestro project (#333) * Added a **pyproject.toml** file to define project metadata and dependencies. * Added **run\_maestro.py** and **osworld\_run\_maestro.py** to provide the main execution logic. * Introduced multiple new modules, including **Evaluator**, **Controller**, **Manager**, and **Sub-Worker**, supporting task planning, state management, and data analysis. * Added a **tools module** containing utility functions and tool configurations to improve code reusability. * Updated the **README** and documentation with usage examples and module descriptions. These changes lay the foundation for expanding the Maestro project’s functionality and improving the user experience. Co-authored-by: Hiroid <guoliangxuan@deepmatrix.com>3a4b673
# file_utils.py
import json
import os
import logging
from pathlib import Path
from contextlib import contextmanager
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)

# ========= File Lock Tools =========
@contextmanager
def locked(path: Path, mode: str):
    """File lock context manager for cross-platform compatibility"""
    if os.name == "nt":
        # Windows implementation
        import msvcrt
        import time as _t
        
        # Always use UTF-8 encoding for text files on Windows
        if 'b' in mode:
            f = open(path, mode)
        else:
            f = open(path, mode, encoding="utf-8")
        try:
            while True:
                try:
                    msvcrt.locking(f.fileno(), msvcrt.LK_NBLCK, 1)
                    break
                except OSError:
                    _t.sleep(0.01)
            yield f
        finally:
            f.seek(0)
            msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
            f.close()
    else:
        # Unix-like systems implementation
        import fcntl
        
        # Always use UTF-8 encoding for text files on Unix-like systems
        if 'b' in mode:
            f = open(path, mode)
        else:
            f = open(path, mode, encoding="utf-8")
        try:
            fcntl.flock(f.fileno(), fcntl.LOCK_EX)
            yield f
        finally:
            fcntl.flock(f.fileno(), fcntl.LOCK_UN)
            f.close()

# ========= Safe JSON Operations =========
def safe_json_dump(data: Any, file_handle, **kwargs) -> None:
    """Safely dump JSON data with proper encoding handling"""
    kwargs.setdefault('ensure_ascii', False)
    kwargs.setdefault('indent', 2)

    try:
        json.dump(data, file_handle, **kwargs)
    except UnicodeEncodeError as e:
        logger.warning(f"UnicodeEncodeError during JSON dump: {e}. Falling back to ASCII mode.")
        kwargs['ensure_ascii'] = True
        json.dump(data, file_handle, **kwargs)

def safe_json_load(file_handle) -> Any:
    """Safely load JSON data with proper encoding handling"""
    try:
        return json.load(file_handle)
    except UnicodeDecodeError as e:
        logger.warning(f"UnicodeDecodeError during JSON load: {e}. Attempting recovery.")
        file_handle.seek(0)
        content = file_handle.read()

        # Try common encodings
        for encoding in ['utf-8-sig', 'latin1', 'cp1252']:
            try:
                if isinstance(content, bytes):
                    decoded_content = content.decode(encoding)
                else:
                    decoded_content = content
                return json.loads(decoded_content)
            except (UnicodeDecodeError, json.JSONDecodeError):
                continue

        logger.error("Failed to decode JSON with all attempted encodings. Returning empty data.")
        return {}

def safe_write_json(path: Path, data: Any) -> None:
    """Safely write JSON data to file with atomic operation"""
    tmp = path.with_suffix(".tmp")
    try:
        with locked(tmp, "w") as f:
            safe_json_dump(data, f)
            f.flush()
            os.fsync(f.fileno())
        tmp.replace(path)
    except Exception as e:
        logger.error(f"Failed to write JSON to {path}: {e}")
        if tmp.exists():
            try:
                tmp.unlink()
            except Exception:
                pass
        raise

def safe_read_json(path: Path, default: Any = None) -> Any:
    """Safely read JSON data from file"""
    try:
        with locked(path, "r") as f:
            return safe_json_load(f)
    except Exception as e:
        logger.warning(f"Failed to read JSON from {path}: {e}")
        return default if default is not None else []

# ========= Safe Text File Operations =========
def safe_write_text(path: Path, content: str) -> None:
    """Safely write text to file with UTF-8 encoding"""
    try:
        path.write_text(content, encoding='utf-8')
    except UnicodeEncodeError as e:
        logger.warning(f"UnicodeEncodeError writing to {path}: {e}. Using error handling.")
        path.write_text(content, encoding='utf-8', errors='replace')

def safe_read_text(path: Path) -> str:
    """Safely read text from file with proper encoding handling"""
    try:
        return path.read_text(encoding='utf-8')
    except UnicodeDecodeError as e:
        logger.warning(f"UnicodeDecodeError reading {path}: {e}. Trying alternative encodings.")
        for encoding in ['utf-8-sig', 'latin1', 'cp1252', 'gbk']:
            try:
                return path.read_text(encoding=encoding)
            except UnicodeDecodeError:
                continue

        logger.error(f"Failed to decode {path} with all encodings. Using error replacement.")
        return path.read_text(encoding='utf-8', errors='replace')

# ========= File Management Utilities =========
def ensure_directory(path: Path) -> None:
    """Ensure directory exists, create if necessary"""
    path.mkdir(parents=True, exist_ok=True)

def safe_file_operation(operation_name: str, file_path: Path, operation_func, *args, **kwargs):
    """Generic safe file operation wrapper with error handling"""
    try:
        return operation_func(*args, **kwargs)
    except FileNotFoundError:
        logger.error(f"{operation_name}: File not found: {file_path}")
        raise
    except PermissionError:
        logger.error(f"{operation_name}: Permission denied: {file_path}")
        raise
    except Exception as e:
        logger.error(f"{operation_name}: Unexpected error with {file_path}: {e}")
        raise

def backup_file(file_path: Path, backup_suffix: str = ".backup") -> Path:
    """Create a backup of a file"""
    backup_path = file_path.with_suffix(file_path.suffix + backup_suffix)
    try:
        if file_path.exists():
            import shutil
            shutil.copy2(file_path, backup_path)
            logger.info(f"Backup created: {backup_path}")
        return backup_path
    except Exception as e:
        logger.error(f"Failed to create backup of {file_path}: {e}")
        raise