mirrored 15 minutes ago
0
Linxin SongCoACT initialize (#292) b968155
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0

import inspect
import re
import sys
from abc import ABC, abstractmethod
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from functools import wraps
from logging import getLogger
from pathlib import Path
from typing import Any, Callable, Generator, Generic, Iterable, Optional, TypeVar, Union

__all__ = [
    "optional_import_block",
    "patch_object",
    "require_optional_import",
    "run_for_optional_imports",
    "skip_on_missing_imports",
]

logger = getLogger(__name__)


@dataclass
class ModuleInfo:
    name: str
    min_version: Optional[str] = None
    max_version: Optional[str] = None
    min_inclusive: bool = False
    max_inclusive: bool = False

    def is_in_sys_modules(self) -> Optional[str]:
        """Check if the module is installed and satisfies the version constraints

        Returns:
            None if the module is installed and satisfies the version constraints, otherwise a message indicating the issue.

        """
        if self.name not in sys.modules:
            return f"'{self.name}' is not installed."
        else:
            if hasattr(sys.modules[self.name], "__file__") and sys.modules[self.name].__file__ is not None:
                autogen_path = (Path(__file__).parent).resolve()
                test_path = (Path(__file__).parent.parent / "test").resolve()
                module_path = Path(sys.modules[self.name].__file__).resolve()  # type: ignore[arg-type]

                if str(autogen_path) in str(module_path) or str(test_path) in str(module_path):
                    # The module is in the autogen or test directory
                    # Aka similarly named module in the autogen or test directory
                    return f"'{self.name}' is not installed."

        installed_version = (
            sys.modules[self.name].__version__ if hasattr(sys.modules[self.name], "__version__") else None
        )
        if installed_version is None and (self.min_version or self.max_version):
            return f"'{self.name}' is installed, but the version is not available."

        if self.min_version:
            msg = f"'{self.name}' is installed, but the installed version {installed_version} is too low (required '{self}')."
            if not self.min_inclusive and installed_version == self.min_version:
                return msg
            if self.min_inclusive and installed_version < self.min_version:  # type: ignore[operator]
                return msg

        if self.max_version:
            msg = f"'{self.name}' is installed, but the installed version {installed_version} is too high (required '{self}')."
            if not self.max_inclusive and installed_version == self.max_version:
                return msg
            if self.max_inclusive and installed_version > self.max_version:  # type: ignore[operator]
                return msg

        return None

    def __repr__(self) -> str:
        s = self.name
        if self.min_version:
            s += f">={self.min_version}" if self.min_inclusive else f">{self.min_version}"
        if self.max_version:
            s += f"<={self.max_version}" if self.max_inclusive else f"<{self.max_version}"
        return s

    @classmethod
    def from_str(cls, module_info: str) -> "ModuleInfo":
        """Parse a string to create a ModuleInfo object

        Args:
            module_info (str): A string containing the module name and optional version constraints

        Returns:
            ModuleInfo: A ModuleInfo object with the parsed information

        Raises:
            ValueError: If the module information is invalid
        """

        pattern = re.compile(r"^(?P<name>[a-zA-Z0-9-_]+)(?P<constraint>.*)$")
        match = pattern.match(module_info.strip())

        if not match:
            raise ValueError(f"Invalid package information: {module_info}")

        name = match.group("name")
        constraints = match.group("constraint").strip()
        min_version = max_version = None
        min_inclusive = max_inclusive = False

        if constraints:
            constraint_pattern = re.findall(r"(>=|<=|>|<)([0-9\.]+)?", constraints)

            if not all(version for _, version in constraint_pattern):
                raise ValueError(f"Invalid module information: {module_info}")

            for operator, version in constraint_pattern:
                if operator == ">=":
                    min_version = version
                    min_inclusive = True
                elif operator == "<=":
                    max_version = version
                    max_inclusive = True
                elif operator == ">":
                    min_version = version
                    min_inclusive = False
                elif operator == "<":
                    max_version = version
                    max_inclusive = False
                else:
                    raise ValueError(f"Invalid package information: {module_info}")

        return ModuleInfo(
            name=name,
            min_version=min_version,
            max_version=max_version,
            min_inclusive=min_inclusive,
            max_inclusive=max_inclusive,
        )


class Result:
    def __init__(self) -> None:
        self._failed: Optional[bool] = None

    @property
    def is_successful(self) -> bool:
        if self._failed is None:
            raise ValueError("Result not set")
        return not self._failed


@contextmanager
def optional_import_block() -> Generator[Result, None, None]:
    """Guard a block of code to suppress ImportErrors

    A context manager to temporarily suppress ImportErrors.
    Use this to attempt imports without failing immediately on missing modules.

    Example:
    ```python
    with optional_import_block():
        import some_module
        import some_other_module
    ```
    """
    result = Result()
    try:
        yield result
        result._failed = False
    except ImportError as e:
        # Ignore ImportErrors during this context
        logger.debug(f"Ignoring ImportError: {e}")
        result._failed = True


def get_missing_imports(modules: Union[str, Iterable[str]]) -> dict[str, str]:
    """Get missing modules from a list of module names

    Args:
        modules (Union[str, Iterable[str]]): Module name or list of module names

    Returns:
        List of missing module names
    """
    if isinstance(modules, str):
        modules = [modules]

    module_infos = [ModuleInfo.from_str(module) for module in modules]
    x = {m.name: m.is_in_sys_modules() for m in module_infos}
    return {k: v for k, v in x.items() if v}


T = TypeVar("T")
G = TypeVar("G", bound=Union[Callable[..., Any], type])
F = TypeVar("F", bound=Callable[..., Any])


class PatchObject(ABC, Generic[T]):
    def __init__(self, o: T, missing_modules: dict[str, str], dep_target: str):
        if not self.accept(o):
            raise ValueError(f"Cannot patch object of type {type(o)}")

        self.o = o
        self.missing_modules = missing_modules
        self.dep_target = dep_target

    @classmethod
    @abstractmethod
    def accept(cls, o: Any) -> bool: ...

    @abstractmethod
    def patch(self, except_for: Iterable[str]) -> T: ...

    def get_object_with_metadata(self) -> Any:
        return self.o

    @property
    def msg(self) -> str:
        o = self.get_object_with_metadata()
        plural = len(self.missing_modules) > 1
        fqn = f"{o.__module__}.{o.__name__}" if hasattr(o, "__module__") else o.__name__
        # modules_str = ", ".join([f"'{m}'" for m in self.missing_modules])
        msg = f"{'Modules' if plural else 'A module'} needed for {fqn} {'are' if plural else 'is'} missing:\n"
        for _, status in self.missing_modules.items():
            msg += f" - {status}\n"
        msg += f"Please install {'them' if plural else 'it'} using:\n'pip install ag2[{self.dep_target}]'"
        return msg

    def copy_metadata(self, retval: T) -> None:
        """Copy metadata from original object to patched object

        Args:
            retval: Patched object

        """
        o = self.o
        if hasattr(o, "__doc__"):
            retval.__doc__ = o.__doc__
        if hasattr(o, "__name__"):
            retval.__name__ = o.__name__  # type: ignore[attr-defined]
        if hasattr(o, "__module__"):
            retval.__module__ = o.__module__

    _registry: list[type["PatchObject[Any]"]] = []

    @classmethod
    def register(cls) -> Callable[[type["PatchObject[Any]"]], type["PatchObject[Any]"]]:
        def decorator(subclass: type["PatchObject[Any]"]) -> type["PatchObject[Any]"]:
            cls._registry.append(subclass)
            return subclass

        return decorator

    @classmethod
    def create(
        cls,
        o: T,
        *,
        missing_modules: dict[str, str],
        dep_target: str,
    ) -> Optional["PatchObject[T]"]:
        for subclass in cls._registry:
            if subclass.accept(o):
                return subclass(o, missing_modules, dep_target)
        return None


@PatchObject.register()
class PatchCallable(PatchObject[F]):
    @classmethod
    def accept(cls, o: Any) -> bool:
        return inspect.isfunction(o) or inspect.ismethod(o)

    def patch(self, except_for: Iterable[str]) -> F:
        if self.o.__name__ in except_for:
            return self.o

        f: Callable[..., Any] = self.o

        # @wraps(f.__call__)  # type: ignore[operator]
        @wraps(f)
        def _call(*args: Any, **kwargs: Any) -> Any:
            raise ImportError(self.msg)

        self.copy_metadata(_call)  # type: ignore[arg-type]

        return _call  # type: ignore[return-value]


@PatchObject.register()
class PatchStatic(PatchObject[F]):
    @classmethod
    def accept(cls, o: Any) -> bool:
        # return inspect.ismethoddescriptor(o)
        return isinstance(o, staticmethod)

    def patch(self, except_for: Iterable[str]) -> F:
        if hasattr(self.o, "__name__"):
            name = self.o.__name__
        elif hasattr(self.o, "__func__"):
            name = self.o.__func__.__name__
        else:
            raise ValueError(f"Cannot determine name for object {self.o}")
        if name in except_for:
            return self.o

        f: Callable[..., Any] = self.o.__func__  # type: ignore[attr-defined]

        @wraps(f)
        def _call(*args: Any, **kwargs: Any) -> Any:
            raise ImportError(self.msg)

        self.copy_metadata(_call)  # type: ignore[arg-type]

        return staticmethod(_call)  # type: ignore[return-value]

    def get_object_with_metadata(self) -> Any:
        return self.o.__func__  # type: ignore[attr-defined]


@PatchObject.register()
class PatchInit(PatchObject[F]):
    @classmethod
    def accept(cls, o: Any) -> bool:
        return inspect.ismethoddescriptor(o) and o.__name__ == "__init__"

    def patch(self, except_for: Iterable[str]) -> F:
        if self.o.__name__ in except_for:
            return self.o

        f: Callable[..., Any] = self.o

        @wraps(f)
        def _call(*args: Any, **kwargs: Any) -> Any:
            raise ImportError(self.msg)

        self.copy_metadata(_call)  # type: ignore[arg-type]

        return staticmethod(_call)  # type: ignore[return-value]

    def get_object_with_metadata(self) -> Any:
        return self.o


@PatchObject.register()
class PatchProperty(PatchObject[Any]):
    @classmethod
    def accept(cls, o: Any) -> bool:
        return inspect.isdatadescriptor(o) and hasattr(o, "fget")

    def patch(self, except_for: Iterable[str]) -> property:
        if not hasattr(self.o, "fget"):
            raise ValueError(f"Cannot patch property without getter: {self.o}")
        f: Callable[..., Any] = self.o.fget

        if f.__name__ in except_for:
            return self.o  # type: ignore[no-any-return]

        @wraps(f)
        def _call(*args: Any, **kwargs: Any) -> Any:
            raise ImportError(self.msg)

        self.copy_metadata(_call)

        return property(_call)

    def get_object_with_metadata(self) -> Any:
        return self.o.fget


@PatchObject.register()
class PatchClass(PatchObject[type[Any]]):
    @classmethod
    def accept(cls, o: Any) -> bool:
        return inspect.isclass(o)

    def patch(self, except_for: Iterable[str]) -> type[Any]:
        if self.o.__name__ in except_for:
            return self.o

        for name, member in inspect.getmembers(self.o):
            # Patch __init__ method if possible, but not other internal methods
            if name.startswith("__") and name != "__init__":
                continue
            patched = patch_object(
                member,
                missing_modules=self.missing_modules,
                dep_target=self.dep_target,
                fail_if_not_patchable=False,
                except_for=except_for,
            )
            with suppress(AttributeError):
                setattr(self.o, name, patched)

        return self.o


def patch_object(
    o: T,
    *,
    missing_modules: dict[str, str],
    dep_target: str,
    fail_if_not_patchable: bool = True,
    except_for: Optional[Union[str, Iterable[str]]] = None,
) -> T:
    patcher = PatchObject.create(o, missing_modules=missing_modules, dep_target=dep_target)
    if fail_if_not_patchable and patcher is None:
        raise ValueError(f"Cannot patch object of type {type(o)}")

    except_for = except_for if except_for is not None else []
    except_for = [except_for] if isinstance(except_for, str) else except_for

    return patcher.patch(except_for=except_for) if patcher else o


def require_optional_import(
    modules: Union[str, Iterable[str]],
    dep_target: str,
    *,
    except_for: Optional[Union[str, Iterable[str]]] = None,
) -> Callable[[T], T]:
    """Decorator to handle optional module dependencies

    Args:
        modules: Module name or list of module names required
        dep_target: Target name for pip installation (e.g. 'test' in pip install ag2[test])
        except_for: Name or list of names of objects to exclude from patching
    """
    missing_modules = get_missing_imports(modules)

    if not missing_modules:

        def decorator(o: T) -> T:
            return o

    else:

        def decorator(o: T) -> T:
            return patch_object(o, missing_modules=missing_modules, dep_target=dep_target, except_for=except_for)

    return decorator


def _mark_object(o: T, dep_target: str) -> T:
    import pytest

    markname = dep_target.replace("-", "_")
    pytest_mark_markname = getattr(pytest.mark, markname)
    pytest_mark_o = pytest_mark_markname(o)

    pytest_mark_o = pytest.mark.aux_neg_flag(pytest_mark_o)

    return pytest_mark_o  # type: ignore[no-any-return]


def run_for_optional_imports(modules: Union[str, Iterable[str]], dep_target: str) -> Callable[[G], G]:
    """Decorator to run a test if and only if optional modules are installed

    Args:
        modules: Module name or list of module names
        dep_target: Target name for pip installation (e.g. 'test' in pip install ag2[test])
    """
    # missing_modules = get_missing_imports(modules)
    # if missing_modules:
    #     raise ImportError(f"Missing module{'s' if len(missing_modules) > 1 else ''}: {', '.join(missing_modules)}. Install using 'pip install ag2[{dep_target}]'")

    def decorator(o: G) -> G:
        missing_modules = get_missing_imports(modules)

        if isinstance(o, type):
            wrapped = require_optional_import(modules, dep_target)(o)
        else:
            if inspect.iscoroutinefunction(o):

                @wraps(o)
                async def wrapped(*args: Any, **kwargs: Any) -> Any:
                    if missing_modules:
                        raise ImportError(
                            f"Missing module{'s' if len(missing_modules) > 1 else ''}: {', '.join(missing_modules)}. Install using 'pip install ag2[{dep_target}]'"
                        )
                    return await o(*args, **kwargs)

            else:

                @wraps(o)
                def wrapped(*args: Any, **kwargs: Any) -> Any:
                    if missing_modules:
                        raise ImportError(
                            f"Missing module{'s' if len(missing_modules) > 1 else ''}: {', '.join(missing_modules)}. Install using 'pip install ag2[{dep_target}]'"
                        )
                    return o(*args, **kwargs)

        pytest_mark_o: G = _mark_object(wrapped, dep_target)  # type: ignore[assignment]

        return pytest_mark_o

    return decorator


def skip_on_missing_imports(modules: Union[str, Iterable[str]], dep_target: str) -> Callable[[T], T]:
    """Decorator to skip a test if an optional module is missing

    Args:
        modules: Module name or list of module names
        dep_target: Target name for pip installation (e.g. 'test' in pip install ag2[test])
    """
    import pytest

    missing_modules = get_missing_imports(modules)

    if not missing_modules:

        def decorator(o: T) -> T:
            pytest_mark_o = _mark_object(o, dep_target)
            return pytest_mark_o  # type: ignore[no-any-return]

    else:

        def decorator(o: T) -> T:
            pytest_mark_o = _mark_object(o, dep_target)

            return pytest.mark.skip(  # type: ignore[return-value,no-any-return]
                f"Missing module{'s' if len(missing_modules) > 1 else ''}: {', '.join(missing_modules)}. Install using 'pip install ag2[{dep_target}]'"
            )(pytest_mark_o)

    return decorator