/
OS-Worldb968155
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
import queue
from asyncio import Queue as AsyncQueue
from typing import TYPE_CHECKING, Any
from autogen.io.base import AsyncIOStreamProtocol, IOStreamProtocol
from ..events.agent_events import InputRequestEvent
from ..events.print_event import PrintEvent
class ThreadIOStream:
def __init__(self) -> None:
self._input_stream: queue.Queue = queue.Queue() # type: ignore[type-arg]
self._output_stream: queue.Queue = queue.Queue() # type: ignore[type-arg]
def input(self, prompt: str = "", *, password: bool = False) -> str:
self.send(InputRequestEvent(prompt=prompt, password=password)) # type: ignore[call-arg]
return self._output_stream.get() # type: ignore[no-any-return]
def print(self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False) -> None:
print_message = PrintEvent(*objects, sep=sep, end=end)
self.send(print_message)
def send(self, message: Any) -> None:
self._input_stream.put(message)
@property
def input_stream(self) -> queue.Queue: # type: ignore[type-arg]
return self._input_stream
class AsyncThreadIOStream:
def __init__(self) -> None:
self._input_stream: AsyncQueue = AsyncQueue() # type: ignore[type-arg]
self._output_stream: AsyncQueue = AsyncQueue() # type: ignore[type-arg]
async def input(self, prompt: str = "", *, password: bool = False) -> str:
self.send(InputRequestEvent(prompt=prompt, password=password)) # type: ignore[call-arg]
return await self._output_stream.get() # type: ignore[no-any-return]
def print(self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False) -> None:
print_message = PrintEvent(*objects, sep=sep, end=end)
self.send(print_message)
def send(self, message: Any) -> None:
self._input_stream.put_nowait(message)
@property
def input_stream(self) -> AsyncQueue[Any]:
return self._input_stream
if TYPE_CHECKING:
def check_type_1(x: ThreadIOStream) -> IOStreamProtocol:
return x
def check_type_2(x: AsyncThreadIOStream) -> AsyncIOStreamProtocol:
return x