mirrored 12 minutes ago
0
Linxin SongCoACT initialize (#292) b968155
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from  https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT
import logging
from collections.abc import Iterator
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Any, Optional, Protocol, Union, runtime_checkable

from ..doc_utils import export_module
from ..events.base_event import BaseEvent

__all__ = ("IOStream", "InputStream", "OutputStream")

logger = logging.getLogger(__name__)


@runtime_checkable
@export_module("autogen.io")
class OutputStream(Protocol):
    def print(self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False) -> None:
        """Print data to the output stream.

        Args:
            objects (any): The data to print.
            sep (str, optional): The separator between objects. Defaults to " ".
            end (str, optional): The end of the output. Defaults to "\n".
            flush (bool, optional): Whether to flush the output. Defaults to False.
        """
        ...  # pragma: no cover

    def send(self, message: BaseEvent) -> None:
        """Send data to the output stream.

        Args:
            message (BaseEvent): BaseEvent from autogen.messages.base_message
        """
        ...


@runtime_checkable
@export_module("autogen.io")
class InputStream(Protocol):
    def input(self, prompt: str = "", *, password: bool = False) -> str:
        """Read a line from the input stream.

        Args:
            prompt (str, optional): The prompt to display. Defaults to "".
            password (bool, optional): Whether to read a password. Defaults to False.

        Returns:
            str: The line read from the input stream.

        """
        ...  # pragma: no cover


@runtime_checkable
@export_module("autogen.io")
class AsyncInputStream(Protocol):
    async def input(self, prompt: str = "", *, password: bool = False) -> str:
        """Read a line from the input stream.

        Args:
            prompt (str, optional): The prompt to display. Defaults to "".
            password (bool, optional): Whether to read a password. Defaults to False.

        Returns:
            str: The line read from the input stream.

        """
        ...  # pragma: no cover


@runtime_checkable
@export_module("autogen.io")
class IOStreamProtocol(InputStream, OutputStream, Protocol):
    """A protocol for input/output streams."""


@runtime_checkable
@export_module("autogen.io")
class AsyncIOStreamProtocol(AsyncInputStream, OutputStream, Protocol):
    """A protocol for input/output streams."""


iostream_union = Union[IOStreamProtocol, AsyncIOStreamProtocol]


@export_module("autogen.io")
class IOStream:
    """A protocol for input/output streams."""

    # ContextVar must be used in multithreaded or async environments
    _default_io_stream: ContextVar[Optional[iostream_union]] = ContextVar("default_iostream", default=None)
    _default_io_stream.set(None)
    _global_default: Optional[iostream_union] = None

    @staticmethod
    def set_global_default(stream: iostream_union) -> None:
        """Set the default input/output stream.

        Args:
            stream (IOStream): The input/output stream to set as the default.
        """
        IOStream._global_default = stream

    @staticmethod
    def get_global_default() -> iostream_union:
        """Get the default input/output stream.

        Returns:
            IOStream: The default input/output stream.
        """
        if IOStream._global_default is None:
            raise RuntimeError("No global default IOStream has been set")
        return IOStream._global_default

    @staticmethod
    def get_default() -> iostream_union:
        """Get the default input/output stream.

        Returns:
            IOStream: The default input/output stream.
        """
        iostream = IOStream._default_io_stream.get()
        if iostream is None:
            iostream = IOStream.get_global_default()
            # Set the default IOStream of the current context (thread/cooroutine)
            IOStream.set_default(iostream)
        return iostream

    @staticmethod
    @contextmanager
    def set_default(stream: Optional[iostream_union]) -> Iterator[None]:
        """Set the default input/output stream.

        Args:
            stream (IOStream): The input/output stream to set as the default.
        """
        global _default_io_stream
        try:
            token = IOStream._default_io_stream.set(stream)
            yield
        finally:
            IOStream._default_io_stream.reset(token)

        return