mirrored 14 minutes ago
0
Your Nameadd terminal context manager, remove license 1e84e8a
"""
Terminal executor

Code source: [terminal_asciicast_record_executor.py](https://github.com/James4Ever0/agi_computer_control/blob/master/web_gui_terminal_recorder/executor_and_replayer/terminal_asciicast_record_executor.py) (modified)
"""

# pending pypi project name: termexec
# if you want to release this code as a pypi package, you must create a through unit test, across launching terminal, docker container, network requests, etc. focus on the effectiveness of clean-up, and the ability to handle exceptions.

import threading
import time
import os
import signal
import atexit

import agg_python_bindings
import ptyprocess
from pydantic import BaseModel
from typing import Protocol, cast


class Cursor(BaseModel):
    """Cursor position and visibility.

    Attributes:
        x (int): The x-coordinate (column) of the cursor.
        y (int): The y-coordinate (row/line) of the cursor.
        hidden (bool): True if the cursor is hidden, False otherwise.
    """

    x: int
    y: int
    hidden: bool


def decode_bytes_to_utf8_safe(_bytes: bytes):
    """
    Decode with UTF-8, but replace errors with a replacement character (�).
    """
    ret = _bytes.decode("utf-8", errors="replace")
    return ret


# screen init params: width, height
# screen traits: write_bytes, display, screenshot


class _TerminalScreenProtocol(Protocol):
    def write_bytes(self, _bytes: bytes): ...
    @property
    def display(self) -> str: ...
    def screenshot(self, png_output_path: str): ...
    def close(self): ...
    @property
    def cursor(self) -> Cursor: ...


class AvtScreen:
    def __init__(self, width: int, height: int):
        """
        Initialize an AvtScreen object.

        Args:
            width (int): The width of the virtual terminal emulator.
            height (int): The height of the virtual terminal emulator.
        """
        self.vt = agg_python_bindings.TerminalEmulator(width, height)
        """terminal emulator provided by avt"""
        self._closing = False

    def write_bytes(self, _bytes: bytes):
        """
        Write the given bytes to the virtual terminal emulator.

        The bytes are decoded with UTF-8 and any decoding errors are replaced with a
        replacement character (�).

        Args:
            _bytes (bytes): The bytes to be written to the virtual terminal emulator.
        """
        decoded_bytes = decode_bytes_to_utf8_safe(_bytes)
        self.vt.feed_str(decoded_bytes)

    @property
    def cursor(self):
        """
        Get the current position of the cursor as a `Cursor` object.

        Returns:
            Cursor: A `Cursor` object with `x` and `y` properties for the current column and row of the cursor, respectively, and a `hidden` property which is `True` if the cursor is hidden and `False` otherwise.
        """
        col, row, visible = self.vt.get_cursor()
        ret = Cursor(x=col, y=row, hidden=not visible)
        return ret

    @property
    def display(self):
        """
        Get the current display of the terminal emulator as a string.

        Returns:
            str: A string representation of the current display of the terminal emulator.
        """
        ret = "\n".join(self.vt.text_raw())
        return ret

    def screenshot(self, png_output_path: str):
        """
        Saves the current state of the terminal emulator to a PNG image file.

        The image has the same width and height as the terminal emulator.

        Args:
            png_output_path: The path to write the PNG image to.
        """
        self.vt.screenshot(png_output_path)

    def close(self):
        """
        Releases all resources used by the terminal emulator.

        This is necessary to avoid crashes when creating multiple instances of this class.
        """
        if not self._closing:
            self._closing = True
            del self.vt

    def __enter__(self):
        return self

    def __exit__(self, exc, value, tb):
        self.close()


class TerminalProcess:
    def __init__(self, command: list[str], width: int, height: int, backend="avt"):
        """
        Initializes the terminal emulator with a command to execute.

        Args:
            command (list[str]): List of command strings to execute in the terminal
            width (int): Width of the terminal emulator
            height (int): Height of the terminal emulator
            backend (str, optional): Backend to use for terminal emulator. Defaults to "avt".
        """
        self._closing = False
        rows, cols = height, width
        self.pty_process: ptyprocess.PtyProcess = cast(
            ptyprocess.PtyProcess,
            ptyprocess.PtyProcess.spawn(command, dimensions=(rows, cols)),
        )
        """a process executing command in a pseudo terminal"""

        if backend == "avt":
            self.vt_screen = AvtScreen(width=width, height=height)
            """virtual terminal screen"""
        else:
            raise ValueError(
                "Unknown terminal emulator backend '%s' (known ones: avt, pyte)"
                % backend
            )

        self.vt_screen = cast(_TerminalScreenProtocol, self.vt_screen)

        self.__pty_process_reading_thread = threading.Thread(
            target=self.__read_and_update_screen, daemon=True
        )
        self.__start_ptyprocess_reading_thread()
        atexit.register(self.close)

    def __start_ptyprocess_reading_thread(self):
        """Starts a thread to read output from the terminal process and update the Pyte screen"""
        self.__pty_process_reading_thread.start()

    def write(self, data: bytes):
        """Writes input data to the terminal process"""
        self.pty_process.write(data)

    def close(self):
        """Closes the terminal process and the reading thread"""
        if not self._closing:
            self._closing = True
            os.kill(self.pty_process.pid, signal.SIGTERM)
            time.sleep(0.5)
            if self.pty_process.isalive:
                os.kill(self.pty_process.pid, signal.SIGKILL)
            self.vt_screen.close()
            self.__pty_process_reading_thread.join(timeout=0.5)

    def __enter__(self):
        return self

    def __exit__(self, exc, value, tb):
        self.close()

    def __read_and_update_screen(self, poll_interval=0.01):
        """Reads available output from terminal and updates Pyte screen

        Args:
            poll_interval (float, optional): Interval in seconds to poll for available output. Defaults to 0.01.
        """
        while True:
            try:
                # ptyprocess.read is blocking. only pexpect has read_nonblocking
                process_output_bytes = self.pty_process.read(1024)
                # write bytes to pyte screen
                self.vt_screen.write_bytes(process_output_bytes)
            except KeyboardInterrupt:  # user interrupted
                break
            except SystemExit:  # python process exit
                break
            except SystemError:  # python error
                break
            except EOFError:  # terminal died
                break
            except:
                # Timeout means no data available, EOF means process ended
                pass
            finally:
                time.sleep(poll_interval)


class TerminalExecutor:
    def __init__(self, command: list[str], width: int, height: int):
        """
        Initializes executor with a command to run in terminal emulator, using avt as backend.

        Args:
            command (list[str]): List of command strings to execute
            width (int): Width of the terminal emulator
            height (int): Height of the terminal emulator
        """
        self.terminal = TerminalProcess(command=command, width=width, height=height)
        """a terminal process, running command in pty screen"""
        self._closing = False

    def input(self, text: str):
        """
        Sends input text to the terminal process

        Args:
            text (str): The input text to send
        """
        self.terminal.write(text.encode())
        # Allow time for processing output
        time.sleep(0.1)

    @property
    def display(self) -> str:
        """
        Get the current display of the terminal emulator as a string.
        """

        return self.terminal.vt_screen.display

    def screenshot(self, png_save_path: str):
        """
        Saves the current display of the terminal emulator as a .png file

        Args:
            png_save_path (str): The path to save the screenshot to
        """
        self.terminal.vt_screen.screenshot(png_save_path)

    def close(self):
        """
        Closes the terminal emulator process and the associated reading thread.
        """
        if not self._closing:
            self._closing = True
            self.terminal.close()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc, value, tb):
        self.close()


def test_harmless_command_locally_with_bash():
    """
    Tests the TerminalExecutor class with a harmless command by running a docker alpine container
    and executing a series of input events, then taking a screenshot and dumping the terminal
    display to a file.
    """
    SLEEP_INTERVAL = 0.5
    command = ["docker", "run", "--rm", "-it", "alpine"]
    input_events = ['echo "Hello World!"', "\n"]
    executor = TerminalExecutor(command=command, width=80, height=24)
    time.sleep(1)
    for event in input_events:
        executor.input(event)
        time.sleep(SLEEP_INTERVAL)
    # check for screenshot, text dump
    text_dump = executor.display
    print("Dumping terminal display to ./terminal_executor_text_dump.txt")
    with open("./terminal_executor_text_dump.txt", "w+") as f:
        f.write(text_dump)
    print("Taking terminal screenshot at ./terminal_executor_screenshot.png")
    executor.screenshot("./terminal_executor_screenshot.png")
    print("Done")


def test():
    """
    Runs a test for the TerminalExecutor class by running a harmless command
    locally with bash and taking a screenshot and dumping the terminal display
    to a file.

    This test is useful for checking that the TerminalExecutor class works in
    a real environment.
    """
    test_harmless_command_locally_with_bash()


if __name__ == "__main__":
    test()